diff --git a/CHANGELOG.md b/CHANGELOG.md index 2a27a00e..b211be55 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -8,6 +8,11 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 ## [2.6.2] - 2026-??-?? - Fix [#561](https://github.com/Neoteroi/BlackSheep/issues/561). +- Change `FromBodyBinder.binder_types` from a constructor-local variable to a + class attribute, defaulting to `[JSONBinder]` only. Users can configure + additional formats by setting `FromBodyBinder.binder_types` (e.g. + `FromBodyBinder.binder_types = [JSONBinder, FormBinder]`) before the + application starts. - Add support for baking OpenAPI Specification files to disk, to support running with `PYTHONOPTIMIZE=2` (or `-OO`) where docstrings are stripped and cannot be used to enrich OpenAPI Documentation automatically. @@ -76,6 +81,46 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 ) ``` +- Fix [#514](https://github.com/Neoteroi/BlackSheep/issues/514) — add support for + **multiple request body formats** on a single endpoint. + - New `MultiFormatBodyBinder`: holds an ordered list of inner `BodyBinder`s and + dispatches `get_value` to the first whose `matches_content_type` returns `True`. + Returns HTTP **415 Unsupported Media Type** when the binder is required and no + inner binder matches the request's `Content-Type`. + - **Union annotation syntax** — the normalization layer detects a union of + body-binder `BoundValue` types and automatically builds a + `MultiFormatBodyBinder`: + + ```python + async def create_item(data: FromJSON[Item] | FromForm[Item]) -> Item: ... + # optional variant + async def create_item(data: FromJSON[Item] | FromForm[Item] | None) -> Item: ... + ``` + + - New **`FromBody[T]`** convenience type: equivalent to + `FromJSON[T] | FromForm[T]`, useful when you want to accept both structured + formats without being explicit: + + ```python + async def create_item(data: FromBody[Item]) -> Item: ... + ``` + + - New **`FromXML[T]`** and **`XMLBinder`**: parse `application/xml` / `text/xml` + request bodies using [`defusedxml`](https://github.com/tiran/defusedxml), + protecting against **XXE injection**, **entity expansion (billion laughs)**, + and **DTD-based attacks**. Security exceptions propagate unmodified so the + application can distinguish attack attempts from ordinary malformed input. + Install the extra with `pip install blacksheep[xml]`. + - All new types compose freely: + + ```python + async def create_item(data: FromJSON[Item] | FromXML[Item] | FromForm[Item]): ... + ``` + + - **OpenAPI Specification** is generated correctly for all combinations: each + accepted content type appears as a separate entry under `requestBody.content`, + all referencing the same schema. + ## [2.6.1] - 2026-02-22 :cat: - Fix missing escaping in `multipart/form-data` filenames and content-disposition headers. diff --git a/blacksheep/__init__.py b/blacksheep/__init__.py index 56c0593e..ae2371d2 100644 --- a/blacksheep/__init__.py +++ b/blacksheep/__init__.py @@ -31,6 +31,7 @@ from .server.authorization import allow_anonymous as allow_anonymous from .server.authorization import auth as auth from .server.bindings import ClientInfo as ClientInfo +from .server.bindings import FromBody as FromBody from .server.bindings import FromBytes as FromBytes from .server.bindings import FromCookie as FromCookie from .server.bindings import FromFiles as FromFiles @@ -41,6 +42,7 @@ from .server.bindings import FromRoute as FromRoute from .server.bindings import FromServices as FromServices from .server.bindings import FromText as FromText +from .server.bindings import FromXML as FromXML from .server.bindings import ServerInfo as ServerInfo from .server.responses import ContentDispositionType as ContentDispositionType from .server.responses import FileInput as FileInput diff --git a/blacksheep/exceptions.pxd b/blacksheep/exceptions.pxd index 005ca220..662c9cf3 100644 --- a/blacksheep/exceptions.pxd +++ b/blacksheep/exceptions.pxd @@ -33,6 +33,10 @@ cdef class InvalidOperation(Exception): pass +cdef class UnsupportedMediaType(HTTPException): + pass + + cdef class FailedRequestError(HTTPException): cdef public str data diff --git a/blacksheep/exceptions.py b/blacksheep/exceptions.py index c570536d..c4241ae1 100644 --- a/blacksheep/exceptions.py +++ b/blacksheep/exceptions.py @@ -51,6 +51,11 @@ def __init__(self, message=None): super().__init__(409, message or "Conflict") +class UnsupportedMediaType(HTTPException): + def __init__(self, message=None): + super().__init__(415, message or "Unsupported media type") + + class RangeNotSatisfiable(HTTPException): def __init__(self): super().__init__(416, "Range not satisfiable") diff --git a/blacksheep/exceptions.pyx b/blacksheep/exceptions.pyx index aff73875..738f3635 100644 --- a/blacksheep/exceptions.pyx +++ b/blacksheep/exceptions.pyx @@ -58,6 +58,12 @@ cdef class Conflict(HTTPException): super().__init__(409, message or "Conflict") +cdef class UnsupportedMediaType(HTTPException): + + def __init__(self, message=None): + super().__init__(415, message or "Unsupported media type") + + cdef class RangeNotSatisfiable(HTTPException): def __init__(self): diff --git a/blacksheep/server/bindings/__init__.py b/blacksheep/server/bindings/__init__.py index 4ad0cb3b..67bac957 100644 --- a/blacksheep/server/bindings/__init__.py +++ b/blacksheep/server/bindings/__init__.py @@ -27,7 +27,7 @@ from rodi import CannotResolveTypeException, ContainerProtocol from blacksheep.contents import FormPart -from blacksheep.exceptions import BadRequest +from blacksheep.exceptions import BadRequest, UnsupportedMediaType from blacksheep.messages import Request from blacksheep.server.bindings.converters import class_converters, converters from blacksheep.server.routing import Router, URLResolver @@ -199,6 +199,34 @@ async def create_profile(data: FromForm[UserProfile]): default_value_type = dict +class FromXML(BoundValue[T]): + """ + A parameter obtained from an XML request body (``application/xml`` or ``text/xml``). + + Requires ``defusedxml`` for safe parsing against common XML attacks (XXE, entity + expansion, DTD injection). Install it with:: + + pip install blacksheep[xml] + + The root element tag is ignored; its child elements are mapped to model fields by + name. All standard field-type coercions (int, float, datetime, UUID …) apply via + the same converter chain used by JSON and form binders. + """ + + default_value_type = dict + + +class FromBody(BoundValue[T]): + """ + A parameter obtained from the request body, accepting multiple content types. + By default accepts application/json and application/x-www-form-urlencoded / + multipart/form-data. For explicit format control use a union annotation such as + ``FromJSON[T] | FromForm[T]``, which is handled identically at runtime. + """ + + default_value_type = dict + + class FromFiles(BoundValue[list[FormPart]]): """ A parameter obtained from multipart/form-data files. @@ -580,6 +608,175 @@ async def read_data(self, request: Request) -> Any: return await request.text() +def _element_to_dict(element) -> dict: + """Recursively convert an ElementTree element to a plain dict. + + - Strips XML namespaces from tag names. + - Multiple sibling elements with the same tag are collected into a list. + - Attributes of the element are merged into the dict. + """ + result: dict = {} + + # Include element attributes first so child tags can override them if needed + for attr_name, attr_value in element.attrib.items(): + if "}" in attr_name: + attr_name = attr_name.split("}", 1)[1] + result[attr_name] = attr_value + + for child in element: + tag = child.tag + if "}" in tag: + tag = tag.split("}", 1)[1] + + child_value = _element_to_dict(child) if len(child) > 0 else child.text + + if tag in result: + existing = result[tag] + if not isinstance(existing, list): + result[tag] = [existing] + result[tag].append(child_value) + else: + result[tag] = child_value + + return result + + +class XMLBinder(BodyBinder): + """ + Extracts a model from an XML request body (``application/xml`` or ``text/xml``). + + Uses ``defusedxml`` to protect against XXE injection, entity expansion (billion + laughs), and DTD-based attacks. Install the extra with:: + + pip install blacksheep[xml] + + The root element tag is ignored; its direct children are mapped to model fields by + name. The same type-coercion converters used by the JSON and form binders apply, + so ``int``, ``float``, ``datetime``, ``UUID``, and other annotated field types are + automatically coerced from their string representation. + """ + + handle = FromXML + + @property + def content_type(self) -> str: + return "application/xml;text/xml" + + def matches_content_type(self, request: Request) -> bool: + return request.declares_content_type( + b"application/xml" + ) or request.declares_content_type(b"text/xml") + + async def read_data(self, request: Request) -> Any: + raw = await request.read() + if not raw: + return None + return self._parse_xml(raw) + + @staticmethod + def _parse_xml(content: bytes) -> dict: + try: + import defusedxml.ElementTree as _ET # type: ignore[import] + from defusedxml import DefusedXmlException # type: ignore[import] + except ImportError as exc: + raise ImportError( + "defusedxml is required for safe XML parsing. " + "Install it with: pip install blacksheep[xml]" + ) from exc + + try: + root = _ET.fromstring(content) + except DefusedXmlException: + raise # security violations (XXE, entity expansion, DTD) propagate as-is + except Exception as exc: + raise InvalidRequestBody(f"Invalid XML: {exc}") from exc + + return _element_to_dict(root) + + +class MultiFormatBodyBinder(BodyBinder): + """ + A BodyBinder that accepts multiple content types by delegating to a list of inner + BodyBinders. The first inner binder whose ``matches_content_type`` returns True is + used to read and parse the request body. + + Instances are created automatically by the normalization layer when a union of + body-binder annotations is used (e.g. ``FromJSON[T] | FromForm[T]``), or when + ``FromBody[T]`` is used. + """ + + def __init__( + self, + inner_binders: "list[BodyBinder]", + expected_type=None, + name: str = "body", + implicit: bool = False, + required: bool = False, + ): + # Pass a no-op converter so BodyBinder.__init__ doesn't build one unnecessarily; + # get_value is fully overridden and never calls self.converter. + super().__init__( + expected_type + or (inner_binders[0].expected_type if inner_binders else object), + name, + implicit, + required, + converter=lambda data: data, + ) + self.inner_binders = inner_binders + + @property + def content_type(self) -> str: + return ";".join(b.content_type for b in self.inner_binders) + + def matches_content_type(self, request: Request) -> bool: + return any(b.matches_content_type(request) for b in self.inner_binders) + + async def read_data(self, request: Request) -> Any: # pragma: no cover + raise NotImplementedError() + + async def get_value(self, request: Request) -> Any: + if request.method in self._excluded_methods: + return None + for binder in self.inner_binders: + if binder.matches_content_type(request): + return await binder.get_value(request) + if self.required: + if not request.has_body(): + raise MissingBodyError() + raise UnsupportedMediaType( + f"None of the supported content types matched the request. " + f"Accepted: {', '.join(b.content_type for b in self.inner_binders)}" + ) + return None + + +class FromBodyBinder(MultiFormatBodyBinder): + """ + Binder for ``FromBody[T]``. By default accepts only JSON bodies. + To support additional formats, configure the ``binder_types`` class attribute:: + + FromBodyBinder.binder_types = [JSONBinder, FormBinder] + """ + + handle = FromBody + binder_types: list[type[BodyBinder]] = [JSONBinder] + + def __init__( + self, + expected_type, + name: str = "body", + implicit: bool = False, + required: bool = False, + converter=None, + ): + inner = [ + binder_type(expected_type, name, implicit, required) + for binder_type in self.binder_types + ] + super().__init__(inner, expected_type, name, implicit, required) + + class BytesBinder(Binder): handle = FromBytes diff --git a/blacksheep/server/normalization.py b/blacksheep/server/normalization.py index 3ea51433..747aa245 100644 --- a/blacksheep/server/normalization.py +++ b/blacksheep/server/normalization.py @@ -29,11 +29,13 @@ from .bindings import ( Binder, + BinderNotRegisteredForValueType, BodyBinder, BoundValue, ControllerBinder, IdentityBinder, JSONBinder, + MultiFormatBodyBinder, QueryBinder, RouteBinder, ServiceBinder, @@ -321,6 +323,58 @@ def _get_bound_value_type(bound_type: Type[BoundValue]) -> Type[Any]: return value_type +def _get_multi_body_union_args(annotation: Any) -> "list | None": + """ + If *annotation* is a union whose non-None members are all BoundValue types handled + by a BodyBinder subclass, returns those non-None args. Otherwise returns None. + + Handles both ``typing.Union[A, B]`` and PEP 604 ``A | B`` syntax. + Requires at least two non-None body-binder args (a single-arg union like + ``FromJSON[T] | None`` is handled by the existing Optional unwrap logic). + """ + is_union = ( + hasattr(annotation, "__origin__") and annotation.__origin__ is Union + ) or _is_union_type(annotation) + if not is_union: + return None + + args = annotation.__args__ + non_none = [a for a in args if a is not type(None)] + + if len(non_none) < 2: + return None + + for arg in non_none: + try: + if not _is_bound_value_annotation(arg): + return None + binder_cls = get_binder_by_type(arg) + if not issubclass(binder_cls, BodyBinder): + return None + except (BinderNotRegisteredForValueType, AttributeError): + return None + + return non_none + + +def _build_multi_format_binder( + args: list, name: str, required: bool +) -> MultiFormatBodyBinder: + """Construct a MultiFormatBodyBinder from a list of BoundValue annotations.""" + inner_binders: list[BodyBinder] = [] + for arg in args: + binder_cls = get_binder_by_type(arg) + expected_type = _get_bound_value_type(arg) + inner_binders.append(binder_cls(expected_type, name, False, required)) + return MultiFormatBodyBinder( + inner_binders, + inner_binders[0].expected_type, + name, + implicit=True, + required=required, + ) + + def _get_parameter_binder( parameter: ParamInfo, services: ContainerProtocol, @@ -337,6 +391,13 @@ def _get_parameter_binder( if original_annotation is _empty: return _get_parameter_binder_without_annotation(services, route, name) + # Detect a union of BodyBinder-handled BoundValue types before the generic + # _check_union, which would raise a NormalizationError for such unions. + multi_args = _get_multi_body_union_args(original_annotation) + if multi_args is not None: + has_none = type(None) in original_annotation.__args__ + return _build_multi_format_binder(multi_args, name, required=not has_none) + # unwrap the Optional[] annotation, if present: is_root_optional, annotation = _check_union(parameter, original_annotation, method) diff --git a/pyproject.toml b/pyproject.toml index c582cdaf..0e61859b 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -49,11 +49,13 @@ version = { attr = "blacksheep.__version__" } [project.optional-dependencies] jinja = ["Jinja2~=3.1.6"] +xml = ["defusedxml>=0.7.1"] full = [ "cryptography>=45.0.2,<47.0.0", "PyJWT~=2.10.1", "websockets~=15.0.1", "Jinja2~=3.1.6", + "defusedxml>=0.7.1", ] [project.urls] diff --git a/requirements.pypy.txt b/requirements.pypy.txt index a3ebff09..1331304d 100644 --- a/requirements.pypy.txt +++ b/requirements.pypy.txt @@ -19,4 +19,4 @@ pytest-cov==6.1.1 uvicorn==0.34.2 pydantic==2.12.3 pydantic_core==2.41.4 - +defusedxml>=0.7.1 diff --git a/requirements.txt b/requirements.txt index 4a7a85a3..e3927cbe 100644 --- a/requirements.txt +++ b/requirements.txt @@ -24,3 +24,4 @@ pytest-cov==6.1.1 uvicorn==0.34.2 pydantic==2.12.3 pydantic_core==2.41.4 +defusedxml>=0.7.1 diff --git a/tests/test_bindings.py b/tests/test_bindings.py index 61cdd219..247869e3 100644 --- a/tests/test_bindings.py +++ b/tests/test_bindings.py @@ -1249,3 +1249,245 @@ async def test_from_body_json_binding_empty_dict(expected_type): assert value.age == 25 assert isinstance(value.contacts, dict) assert len(value.contacts) == 0 + + +# region MultiFormatBodyBinder tests + +from blacksheep.exceptions import UnsupportedMediaType as UnsupportedMediaTypeExc +from blacksheep.server.bindings import FormBinder, MultiFormatBodyBinder + + +@dataclass +class MultiItem: + name: str + value: int + + +async def test_multi_format_binder_dispatches_to_json(): + binder = MultiFormatBodyBinder( + [JSONBinder(MultiItem, "body", False, True), FormBinder(MultiItem, "body", False, True)], + MultiItem, + "body", + required=True, + ) + request = Request( + "POST", b"/", [(b"content-type", b"application/json")] + ).with_content(JSONContent({"name": "test", "value": 42})) + + result = await binder.get_value(request) + assert isinstance(result, MultiItem) + assert result.name == "test" + assert result.value == 42 + + +async def test_multi_format_binder_dispatches_to_form(): + binder = MultiFormatBodyBinder( + [JSONBinder(MultiItem, "body", False, True), FormBinder(MultiItem, "body", False, True)], + MultiItem, + "body", + required=True, + ) + request = Request( + "POST", b"/", [(b"content-type", b"application/x-www-form-urlencoded")] + ).with_content(FormContent({"name": "test", "value": "42"})) + + result = await binder.get_value(request) + assert isinstance(result, MultiItem) + assert result.name == "test" + + +async def test_multi_format_binder_raises_415_for_unsupported_content_type(): + binder = MultiFormatBodyBinder( + [JSONBinder(MultiItem, "body", False, True), FormBinder(MultiItem, "body", False, True)], + MultiItem, + "body", + required=True, + ) + from blacksheep.contents import Content + + request = Request( + "POST", b"/", [(b"content-type", b"application/xml")] + ).with_content(Content(b"application/xml", b"")) + + with pytest.raises(UnsupportedMediaTypeExc): + await binder.get_value(request) + + +async def test_multi_format_binder_returns_none_when_optional_and_no_match(): + binder = MultiFormatBodyBinder( + [JSONBinder(MultiItem, "body", False, False), FormBinder(MultiItem, "body", False, False)], + MultiItem, + "body", + required=False, + ) + from blacksheep.contents import Content + + request = Request( + "POST", b"/", [(b"content-type", b"application/xml")] + ).with_content(Content(b"application/xml", b"")) + + result = await binder.get_value(request) + assert result is None + + +async def test_multi_format_binder_skips_excluded_methods(): + binder = MultiFormatBodyBinder( + [JSONBinder(dict, "body", False, False)], + dict, + "body", + required=False, + ) + for method in ("GET", "HEAD", "TRACE"): + request = Request( + method, b"/", [(b"content-type", b"application/json")] + ).with_content(JSONContent({"x": 1})) + result = await binder.get_value(request) + assert result is None + + +def test_multi_format_binder_content_type_combines_inner(): + binder = MultiFormatBodyBinder( + [JSONBinder(dict, "body"), FormBinder(dict, "body")], + dict, + "body", + ) + parts = binder.content_type.split(";") + assert "application/json" in parts + assert "multipart/form-data" in parts + assert "application/x-www-form-urlencoded" in parts + + +# endregion + + +# region XMLBinder tests + +from blacksheep.contents import Content +from blacksheep.server.bindings import FromXML, XMLBinder + + +XML_ITEM = b"hello7" +XML_NESTED = b"1" +XML_ATTR = b'attr' +XML_LIST = b"ab" + + +async def test_xml_binder_parses_simple_fields(): + binder = XMLBinder(MultiItem, "body", required=True) + request = Request( + "POST", b"/", [(b"content-type", b"application/xml")] + ).with_content(Content(b"application/xml", XML_ITEM)) + + result = await binder.get_value(request) + assert isinstance(result, MultiItem) + assert result.name == "hello" + assert result.value == 7 # coerced from string + + +async def test_xml_binder_accepts_text_xml_content_type(): + binder = XMLBinder(MultiItem, "body", required=True) + request = Request( + "POST", b"/", [(b"content-type", b"text/xml")] + ).with_content(Content(b"text/xml", XML_ITEM)) + + result = await binder.get_value(request) + assert isinstance(result, MultiItem) + + +async def test_xml_binder_does_not_match_json_content_type(): + binder = XMLBinder(MultiItem, "body", required=False) + request = Request( + "POST", b"/", [(b"content-type", b"application/json")] + ).with_content(JSONContent({"name": "x", "value": 1})) + + result = await binder.get_value(request) + assert result is None + + +async def test_xml_binder_raises_bad_request_for_malformed_xml(): + binder = XMLBinder(dict, "body", required=True) + request = Request( + "POST", b"/", [(b"content-type", b"application/xml")] + ).with_content(Content(b"application/xml", b"' + b']>' + b"&xxe;1" + ) + with pytest.raises( + (defusedxml.DTDForbidden, defusedxml.EntitiesForbidden, defusedxml.ExternalReferenceForbidden) + ): + XMLBinder._parse_xml(xxe_payload) + + +def test_xml_binder_rejects_billion_laughs(): + """Verify defusedxml blocks entity expansion (billion laughs).""" + import defusedxml + + billion_laughs = ( + b'' + b"' + b' ' + b"]>" + b"&lol2;1" + ) + with pytest.raises((defusedxml.DTDForbidden, defusedxml.EntitiesForbidden)): + XMLBinder._parse_xml(billion_laughs) + + +def test_element_to_dict_handles_attributes(): + from blacksheep.server.bindings import _element_to_dict + import xml.etree.ElementTree as ET + + root = ET.fromstring(XML_ATTR) + d = _element_to_dict(root) + assert d["id"] == "99" + assert d["name"] == "attr" + + +def test_element_to_dict_handles_nested(): + from blacksheep.server.bindings import _element_to_dict + import xml.etree.ElementTree as ET + + root = ET.fromstring(XML_NESTED) + d = _element_to_dict(root) + assert isinstance(d["inner"], dict) + assert d["inner"]["x"] == "1" + + +def test_element_to_dict_collects_repeated_tags_as_list(): + from blacksheep.server.bindings import _element_to_dict + import xml.etree.ElementTree as ET + + root = ET.fromstring(XML_LIST) + d = _element_to_dict(root) + assert d["tag"] == ["a", "b"] + + +def test_xml_binder_content_type(): + binder = XMLBinder(dict, "body") + parts = binder.content_type.split(";") + assert "application/xml" in parts + assert "text/xml" in parts + + +# endregion diff --git a/tests/test_normalization.py b/tests/test_normalization.py index d5752424..b69efb25 100644 --- a/tests/test_normalization.py +++ b/tests/test_normalization.py @@ -673,3 +673,108 @@ async def example_2() -> AsyncIterable[str]: assert get_asyncgen_yield_type(example_1) is int assert get_asyncgen_yield_type(example_2) is str + + +# region MultiFormatBodyBinder normalization tests + +from dataclasses import dataclass + +from blacksheep import FormContent, JSONContent, Request +from blacksheep.server.bindings import FormBinder, FromBody, FromBodyBinder, FromForm, MultiFormatBodyBinder + + +@dataclass +class NormItem: + name: str + value: int + + +def test_union_body_annotation_creates_multi_format_binder(): + """FromJSON[T] | FromForm[T] should produce a MultiFormatBodyBinder.""" + + def handler(data: FromJSON[NormItem] | FromForm[NormItem]): ... + + binders = get_binders(Route(b"/", handler), Container()) + body_binder = next(b for b in binders if isinstance(b, MultiFormatBodyBinder)) + assert body_binder.expected_type is NormItem + assert body_binder.required is True + binder_types = {type(b) for b in body_binder.inner_binders} + assert JSONBinder in binder_types + assert FormBinder in binder_types + + +def test_optional_union_body_annotation_creates_optional_multi_format_binder(): + """FromJSON[T] | FromForm[T] | None should produce an optional MultiFormatBodyBinder.""" + + def handler(data: FromJSON[NormItem] | FromForm[NormItem] | None): ... + + binders = get_binders(Route(b"/", handler), Container()) + body_binder = next(b for b in binders if isinstance(b, MultiFormatBodyBinder)) + assert body_binder.required is False + + +def test_from_body_annotation_creates_multi_format_binder(monkeypatch): + """FromBody[T] should produce a MultiFormatBodyBinder accepting JSON and form.""" + monkeypatch.setattr(FromBodyBinder, "binder_types", [JSONBinder, FormBinder]) + + def handler(data: FromBody[NormItem]): ... + + binders = get_binders(Route(b"/", handler), Container()) + body_binder = next(b for b in binders if isinstance(b, MultiFormatBodyBinder)) + assert body_binder.expected_type is NormItem + binder_types = {type(b) for b in body_binder.inner_binders} + assert JSONBinder in binder_types + assert FormBinder in binder_types + + +async def test_union_body_binder_dispatches_correctly_in_handler(): + """End-to-end: handler with union annotation parses JSON and form requests.""" + + async def handler(data: FromJSON[NormItem] | FromForm[NormItem]): + return data + + from blacksheep.server.normalization import normalize_handler + + route = Route(b"/", handler) + normalized = normalize_handler(route, Container()) + + json_request = Request( + "POST", b"/", [(b"content-type", b"application/json")] + ).with_content(JSONContent({"name": "hello", "value": 1})) + result = await normalized(json_request) + assert result is not None + + form_request = Request( + "POST", b"/", [(b"content-type", b"application/x-www-form-urlencoded")] + ).with_content(FormContent({"name": "hello", "value": "1"})) + result = await normalized(form_request) + assert result is not None + + +def test_regular_optional_body_not_treated_as_multi_format(): + """FromJSON[T] | None should still work as a regular optional single binder.""" + + def handler(data: FromJSON[NormItem] | None): ... + + binders = get_binders(Route(b"/", handler), Container()) + body_binder = next(b for b in binders if isinstance(b, JSONBinder)) + # root_required=False marks the parameter as optional at the handler level + assert body_binder.root_required is False + assert not isinstance(body_binder, MultiFormatBodyBinder) + + +def test_union_with_xml_creates_multi_format_binder(): + """FromJSON[T] | FromXML[T] should produce a MultiFormatBodyBinder.""" + from blacksheep.server.bindings import FromXML, XMLBinder + + def handler(data: FromJSON[NormItem] | FromXML[NormItem]): ... + + binders = get_binders(Route(b"/", handler), Container()) + body_binder = next(b for b in binders if isinstance(b, MultiFormatBodyBinder)) + assert body_binder.expected_type is NormItem + binder_types = {type(b) for b in body_binder.inner_binders} + assert JSONBinder in binder_types + assert XMLBinder in binder_types + + +# endregion diff --git a/tests/test_openapi_v3.py b/tests/test_openapi_v3.py index 307023bf..08640649 100644 --- a/tests/test_openapi_v3.py +++ b/tests/test_openapi_v3.py @@ -4713,3 +4713,123 @@ async def get_foxes(): assert (tmp_path / "openapi.json").exists() assert (tmp_path / "openapi.yaml").exists() assert "/foxes" in _json.loads(docs._json_docs)["paths"] + + +# region MultiFormatBodyBinder OpenAPI tests + + +async def test_multi_format_union_body_generates_all_content_types( + docs: OpenAPIHandler, serializer: Serializer +): + """Union annotation FromJSON[T] | FromForm[T] should document all content types.""" + from blacksheep.server.bindings import FromJSON + + app = get_app() + + @app.router.post("/items") + def create_item(data: FromJSON[CreateFooInput] | FromForm[CreateFooInput]) -> Foo: ... + + docs.bind_app(app) + await app.start() + + yaml = serializer.to_yaml(docs.generate_documentation(app)) + + assert "application/json:" in yaml + assert "multipart/form-data:" in yaml + assert "application/x-www-form-urlencoded:" in yaml + # All three content types reference the same schema + assert yaml.count("$ref: '#/components/schemas/CreateFooInput'") == 3 + assert "required: true" in yaml + + +async def test_from_body_generates_json_and_form_content_types( + docs: OpenAPIHandler, serializer: Serializer, monkeypatch +): + """FromBody[T] should document both JSON and form content types.""" + from blacksheep.server.bindings import FormBinder, FromBody, FromBodyBinder, JSONBinder + + monkeypatch.setattr(FromBodyBinder, "binder_types", [JSONBinder, FormBinder]) + + app = get_app() + + @app.router.post("/items") + def create_item(data: FromBody[CreateFooInput]) -> Foo: ... + + docs.bind_app(app) + await app.start() + + yaml = serializer.to_yaml(docs.generate_documentation(app)) + + assert "application/json:" in yaml + assert "multipart/form-data:" in yaml + assert "application/x-www-form-urlencoded:" in yaml + assert yaml.count("$ref: '#/components/schemas/CreateFooInput'") == 3 + assert "required: true" in yaml + + +async def test_optional_union_body_generates_not_required( + docs: OpenAPIHandler, serializer: Serializer +): + """FromJSON[T] | FromForm[T] | None should emit required: false.""" + from blacksheep.server.bindings import FromJSON + + app = get_app() + + @app.router.post("/items") + def create_item( + data: FromJSON[CreateFooInput] | FromForm[CreateFooInput] | None, + ) -> Foo: ... + + docs.bind_app(app) + await app.start() + + yaml = serializer.to_yaml(docs.generate_documentation(app)) + + assert "application/json:" in yaml + assert "required: false" in yaml + + +async def test_xml_body_binder_generates_xml_content_types( + docs: OpenAPIHandler, serializer: Serializer +): + """FromXML[T] should document application/xml and text/xml content types.""" + from blacksheep.server.bindings import FromXML + + app = get_app() + + @app.router.post("/items") + def create_item(data: FromXML[CreateFooInput]) -> Foo: ... + + docs.bind_app(app) + await app.start() + + yaml = serializer.to_yaml(docs.generate_documentation(app)) + + assert "application/xml:" in yaml + assert "text/xml:" in yaml + assert "$ref: '#/components/schemas/CreateFooInput'" in yaml + + +async def test_json_xml_union_generates_all_content_types( + docs: OpenAPIHandler, serializer: Serializer +): + """FromJSON[T] | FromXML[T] should document both JSON and XML content types.""" + from blacksheep.server.bindings import FromJSON, FromXML + + app = get_app() + + @app.router.post("/items") + def create_item(data: FromJSON[CreateFooInput] | FromXML[CreateFooInput]) -> Foo: ... + + docs.bind_app(app) + await app.start() + + yaml = serializer.to_yaml(docs.generate_documentation(app)) + + assert "application/json:" in yaml + assert "application/xml:" in yaml + assert "text/xml:" in yaml + assert yaml.count("$ref: '#/components/schemas/CreateFooInput'") == 3 + + +# endregion