From 9852599ccde95335c1b2906a6daa88bcfd7967d5 Mon Sep 17 00:00:00 2001 From: "devin-ai-integration[bot]" Date: Sat, 2 May 2026 11:57:30 +0000 Subject: [PATCH 1/4] build: add ijson as a runtime dependency Adds the ijson streaming JSON parser as a direct dependency so connectors that ship inside the source-declarative-manifest base image can stream-parse very large JSON response bodies without materializing the full document in memory. Motivation: source-amazon-seller-partner currently OOMs while reading GET_BRAND_ANALYTICS_SEARCH_TERMS_REPORT documents that can exceed 3 GB uncompressed. See airbytehq/oncall#12143. --- poetry.lock | 108 ++++++++++++++++++++++++++++++++++++++++++++++++- pyproject.toml | 1 + 2 files changed, 108 insertions(+), 1 deletion(-) diff --git a/poetry.lock b/poetry.lock index 55c2fa668..2016c77d9 100644 --- a/poetry.lock +++ b/poetry.lock @@ -2170,6 +2170,112 @@ files = [ [package.extras] all = ["flake8 (>=7.1.1)", "mypy (>=1.11.2)", "pytest (>=8.3.2)", "ruff (>=0.6.2)"] +[[package]] +name = "ijson" +version = "3.5.0" +description = "Iterative JSON parser with standard Python iterator interfaces" +optional = false +python-versions = ">=3.9" +groups = ["main"] +markers = "python_version <= \"3.11\" or python_version >= \"3.12\"" +files = [ + {file = "ijson-3.5.0-cp310-cp310-macosx_10_9_universal2.whl", hash = "sha256:ea8dcac10d86adaeead454bc25c97b68d0bda573d5fd6f86f5e21cf8f7906f88"}, + {file = "ijson-3.5.0-cp310-cp310-macosx_10_9_x86_64.whl", hash = "sha256:92b0495bbb2150bbf14fc5d98fb6d76bcd1c526605a172709e602e6fedc96495"}, + {file = "ijson-3.5.0-cp310-cp310-macosx_11_0_arm64.whl", hash = "sha256:7af0c4c8943be8b09a4e57bdc1da6001dae7b36526d4154fe5c8224738d0921f"}, + {file = "ijson-3.5.0-cp310-cp310-manylinux1_i686.manylinux_2_28_i686.manylinux_2_5_i686.whl", hash = "sha256:45887d5e84ff0d2b138c926cebd9071830733968afe8d9d12080b3c178c7f918"}, + {file = "ijson-3.5.0-cp310-cp310-manylinux2014_aarch64.manylinux_2_17_aarch64.manylinux_2_28_aarch64.whl", hash = "sha256:9a70b575be8e57a28c80e90ed349ad3a851c3478524c70e36e07d6092ecd12c9"}, + {file = "ijson-3.5.0-cp310-cp310-manylinux2014_x86_64.manylinux_2_17_x86_64.manylinux_2_28_x86_64.whl", hash = "sha256:2adeecd45830bfd5580ca79a584154713aabef0b9607e16249133df5d2859813"}, + {file = "ijson-3.5.0-cp310-cp310-musllinux_1_2_aarch64.whl", hash = "sha256:d873e72889e7fc5962ab58909f1adff338d7c2f49e450e5b5fe844eff8155a14"}, + {file = "ijson-3.5.0-cp310-cp310-musllinux_1_2_i686.whl", hash = "sha256:9a88c559456a79708592234d697645d92b599718f4cbbeaa6515f83ac63ca0ae"}, + {file = "ijson-3.5.0-cp310-cp310-musllinux_1_2_x86_64.whl", hash = "sha256:cf83f58ad50dc0d39a2105cb26d4f359b38f42cef68b913170d4d47d97d97ba5"}, + {file = "ijson-3.5.0-cp310-cp310-win32.whl", hash = "sha256:aec4580a7712a19b1f95cd41bed260fc6a31266d37ef941827772a4c199e8143"}, + {file = "ijson-3.5.0-cp310-cp310-win_amd64.whl", hash = "sha256:9a9c4c70501e23e8eb1675330686d1598eebfa14b6f0dbc8f00c2e081cc628fa"}, + {file = "ijson-3.5.0-cp311-cp311-macosx_10_9_universal2.whl", hash = "sha256:5616311404b858d32740b7ad8b9a799c62165f5ecb85d0a8ed16c21665a90533"}, + {file = "ijson-3.5.0-cp311-cp311-macosx_10_9_x86_64.whl", hash = "sha256:e9733f94029dd41702d573ef64752e2556e72aea14623d6dbb7a44ca1ccf30fd"}, + {file = "ijson-3.5.0-cp311-cp311-macosx_11_0_arm64.whl", hash = "sha256:db8398c6721b98412a4f618da8022550c8b9c5d9214040646071b5deb4d4a393"}, + {file = "ijson-3.5.0-cp311-cp311-manylinux1_i686.manylinux_2_28_i686.manylinux_2_5_i686.whl", hash = "sha256:c061314845c08163b1784b6076ea5f075372461a32e6916f4e5f211fd4130b64"}, + {file = "ijson-3.5.0-cp311-cp311-manylinux2014_aarch64.manylinux_2_17_aarch64.manylinux_2_28_aarch64.whl", hash = "sha256:1111a1c5ac79119c5d6e836f900c1a53844b50a18af38311baa6bb61e2645aca"}, + {file = "ijson-3.5.0-cp311-cp311-manylinux2014_x86_64.manylinux_2_17_x86_64.manylinux_2_28_x86_64.whl", hash = "sha256:1e74aff8c681c24002b61b1822f9511d4c384f324f7dbc08c78538e01fdc9fcb"}, + {file = "ijson-3.5.0-cp311-cp311-musllinux_1_2_aarch64.whl", hash = "sha256:739a7229b1b0cc5f7e2785a6e7a5fc915e850d3fed9588d0e89a09f88a417253"}, + {file = "ijson-3.5.0-cp311-cp311-musllinux_1_2_i686.whl", hash = "sha256:ef88712160360cab3ca6471a4e5418243f8b267cf1fe1620879d1b5558babc71"}, + {file = "ijson-3.5.0-cp311-cp311-musllinux_1_2_x86_64.whl", hash = "sha256:6ca0d1b6b5f8166a6248f4309497585fb8553b04bc8179a0260fad636cfdb798"}, + {file = "ijson-3.5.0-cp311-cp311-win32.whl", hash = "sha256:966039cf9047c7967febf7b9a52ec6f38f5464a4c7fbb5565e0224b7376fefff"}, + {file = "ijson-3.5.0-cp311-cp311-win_amd64.whl", hash = "sha256:6bad6a1634cb7c9f3f4c7e52325283b35b565f5b6cc27d42660c6912ce883422"}, + {file = "ijson-3.5.0-cp312-cp312-macosx_10_13_universal2.whl", hash = "sha256:1ebefbe149a6106cc848a3eaf536af51a9b5ccc9082de801389f152dba6ab755"}, + {file = "ijson-3.5.0-cp312-cp312-macosx_10_13_x86_64.whl", hash = "sha256:19e30d9f00f82e64de689c0b8651b9cfed879c184b139d7e1ea5030cec401c21"}, + {file = "ijson-3.5.0-cp312-cp312-macosx_11_0_arm64.whl", hash = "sha256:a04a33ee78a6f27b9b8528c1ca3c207b1df3b8b867a4cf2fcc4109986f35c227"}, + {file = "ijson-3.5.0-cp312-cp312-manylinux1_i686.manylinux_2_28_i686.manylinux_2_5_i686.whl", hash = "sha256:7d48dc2984af02eb3c56edfb3f13b3f62f2f3e4fe36f058c8cfc75d93adf4fed"}, + {file = "ijson-3.5.0-cp312-cp312-manylinux2014_aarch64.manylinux_2_17_aarch64.manylinux_2_28_aarch64.whl", hash = "sha256:f1e73a44844d9adbca9cf2c4132cd875933e83f3d4b23881fcaf82be83644c7d"}, + {file = "ijson-3.5.0-cp312-cp312-manylinux2014_x86_64.manylinux_2_17_x86_64.manylinux_2_28_x86_64.whl", hash = "sha256:7389a56b8562a19948bdf1d7bae3a2edc8c7f86fb59834dcb1c4c722818e645a"}, + {file = "ijson-3.5.0-cp312-cp312-musllinux_1_2_aarch64.whl", hash = "sha256:3176f23f8ebec83f374ed0c3b4e5a0c4db7ede54c005864efebbed46da123608"}, + {file = "ijson-3.5.0-cp312-cp312-musllinux_1_2_i686.whl", hash = "sha256:6babd88e508630c6ef86c9bebaaf13bb2fb8ec1d8f8868773a03c20253f599bc"}, + {file = "ijson-3.5.0-cp312-cp312-musllinux_1_2_x86_64.whl", hash = "sha256:dc1b3836b174b6db2fa8319f1926fb5445abd195dc963368092103f8579cb8ed"}, + {file = "ijson-3.5.0-cp312-cp312-win32.whl", hash = "sha256:6673de9395fb9893c1c79a43becd8c8fbee0a250be6ea324bfd1487bb5e9ee4c"}, + {file = "ijson-3.5.0-cp312-cp312-win_amd64.whl", hash = "sha256:f4f7fabd653459dcb004175235f310435959b1bb5dfa8878578391c6cc9ad944"}, + {file = "ijson-3.5.0-cp313-cp313-macosx_10_13_universal2.whl", hash = "sha256:e9cedc10e40dd6023c351ed8bfc7dcfce58204f15c321c3c1546b9c7b12562a4"}, + {file = "ijson-3.5.0-cp313-cp313-macosx_10_13_x86_64.whl", hash = "sha256:3647649f782ee06c97490b43680371186651f3f69bebe64c6083ee7615d185e5"}, + {file = "ijson-3.5.0-cp313-cp313-macosx_11_0_arm64.whl", hash = "sha256:90e74be1dce05fce73451c62d1118671f78f47c9f6be3991c82b91063bf01fc9"}, + {file = "ijson-3.5.0-cp313-cp313-manylinux1_i686.manylinux_2_28_i686.manylinux_2_5_i686.whl", hash = "sha256:78e9ad73e7be2dd80627504bd5cbf512348c55ce2c06e362ed7683b5220e8568"}, + {file = "ijson-3.5.0-cp313-cp313-manylinux2014_aarch64.manylinux_2_17_aarch64.manylinux_2_28_aarch64.whl", hash = "sha256:9577449313cc94be89a4fe4b3e716c65f09cc19636d5a6b2861c4e80dddebd58"}, + {file = "ijson-3.5.0-cp313-cp313-manylinux2014_x86_64.manylinux_2_17_x86_64.manylinux_2_28_x86_64.whl", hash = "sha256:3e4c1178fb50aff5f5701a30a5152ead82a14e189ce0f6102fa1b5f10b2f54ff"}, + {file = "ijson-3.5.0-cp313-cp313-musllinux_1_2_aarch64.whl", hash = "sha256:0eb402ab026ffb37a918d75af2b7260fe6cfbce13232cc83728a714dd30bd81d"}, + {file = "ijson-3.5.0-cp313-cp313-musllinux_1_2_i686.whl", hash = "sha256:5b08ee08355f9f729612a8eb9bf69cc14f9310c3b2a487c6f1c3c65d85216ec4"}, + {file = "ijson-3.5.0-cp313-cp313-musllinux_1_2_x86_64.whl", hash = "sha256:bda62b6d48442903e7bf56152108afb7f0f1293c2b9bef2f2c369defea76ab18"}, + {file = "ijson-3.5.0-cp313-cp313-win32.whl", hash = "sha256:8d073d9b13574cfa11083cc7267c238b7a6ed563c2661e79192da4a25f09c82c"}, + {file = "ijson-3.5.0-cp313-cp313-win_amd64.whl", hash = "sha256:2419f9e32e0968a876b04d8f26aeac042abd16f582810b576936bbc4c6015069"}, + {file = "ijson-3.5.0-cp313-cp313t-macosx_10_13_universal2.whl", hash = "sha256:4d4b0cd676b8c842f7648c1a783448fac5cd3b98289abd83711b3e275e143524"}, + {file = "ijson-3.5.0-cp313-cp313t-macosx_10_13_x86_64.whl", hash = "sha256:252dec3680a48bb82d475e36b4ae1b3a9d7eb690b951bb98a76c5fe519e30188"}, + {file = "ijson-3.5.0-cp313-cp313t-macosx_11_0_arm64.whl", hash = "sha256:aa1b5dca97d323931fde2501172337384c958914d81a9dac7f00f0d4bfc76bc7"}, + {file = "ijson-3.5.0-cp313-cp313t-manylinux1_i686.manylinux_2_28_i686.manylinux_2_5_i686.whl", hash = "sha256:7a5ec7fd86d606094bba6f6f8f87494897102fa4584ef653f3005c51a784c320"}, + {file = "ijson-3.5.0-cp313-cp313t-manylinux2014_aarch64.manylinux_2_17_aarch64.manylinux_2_28_aarch64.whl", hash = "sha256:009f41443e1521847701c6d87fa3923c0b1961be3c7e7de90947c8cb92ea7c44"}, + {file = "ijson-3.5.0-cp313-cp313t-manylinux2014_x86_64.manylinux_2_17_x86_64.manylinux_2_28_x86_64.whl", hash = "sha256:e4c3651d1f9fe2839a93fdf8fd1d5ca3a54975349894249f3b1b572bcc4bd577"}, + {file = "ijson-3.5.0-cp313-cp313t-musllinux_1_2_aarch64.whl", hash = "sha256:945b7abcfcfeae2cde17d8d900870f03536494245dda7ad4f8d056faa303256c"}, + {file = "ijson-3.5.0-cp313-cp313t-musllinux_1_2_i686.whl", hash = "sha256:0574b0a841ff97495c13e9d7260fbf3d85358b061f540c52a123db9dbbaa2ed6"}, + {file = "ijson-3.5.0-cp313-cp313t-musllinux_1_2_x86_64.whl", hash = "sha256:f969ffb2b89c5cdf686652d7fb66252bc72126fa54d416317411497276056a18"}, + {file = "ijson-3.5.0-cp313-cp313t-win32.whl", hash = "sha256:59d3f9f46deed1332ad669518b8099920512a78bda64c1f021fcd2aff2b36693"}, + {file = "ijson-3.5.0-cp313-cp313t-win_amd64.whl", hash = "sha256:5c2839fa233746d8aad3b8cd2354e441613f5df66d721d59da4a09394bd1db2b"}, + {file = "ijson-3.5.0-cp314-cp314-macosx_10_15_universal2.whl", hash = "sha256:25a5a6b2045c90bb83061df27cfa43572afa43ba9408611d7bfe237c20a731a9"}, + {file = "ijson-3.5.0-cp314-cp314-macosx_10_15_x86_64.whl", hash = "sha256:8976c54c0b864bc82b951bae06567566ac77ef63b90a773a69cd73aab47f4f4f"}, + {file = "ijson-3.5.0-cp314-cp314-macosx_11_0_arm64.whl", hash = "sha256:859eb2038f7f1b0664df4241957694cc35e6295992d71c98659b22c69b3cbc10"}, + {file = "ijson-3.5.0-cp314-cp314-manylinux1_i686.manylinux_2_28_i686.manylinux_2_5_i686.whl", hash = "sha256:c911aa02991c7c0d3639b6619b93a93210ff1e7f58bf7225d613abea10adc78e"}, + {file = "ijson-3.5.0-cp314-cp314-manylinux2014_aarch64.manylinux_2_17_aarch64.manylinux_2_28_aarch64.whl", hash = "sha256:903cbdc350173605220edc19796fbea9b2203c8b3951fb7335abfa8ed37afda8"}, + {file = "ijson-3.5.0-cp314-cp314-manylinux2014_x86_64.manylinux_2_17_x86_64.manylinux_2_28_x86_64.whl", hash = "sha256:a4549d96ded5b8efa71639b2160235415f6bdb8c83367615e2dbabcb72755c33"}, + {file = "ijson-3.5.0-cp314-cp314-musllinux_1_2_aarch64.whl", hash = "sha256:6b2dcf6349e6042d83f3f8c39ce84823cf7577eba25bac5aae5e39bbbbbe9c1c"}, + {file = "ijson-3.5.0-cp314-cp314-musllinux_1_2_i686.whl", hash = "sha256:e44af39e6f8a17e5627dcd89715d8279bf3474153ff99aae031a936e5c5572e5"}, + {file = "ijson-3.5.0-cp314-cp314-musllinux_1_2_x86_64.whl", hash = "sha256:9260332304b7e7828db56d43f08fc970a3ab741bf84ff10189361ea1b60c395b"}, + {file = "ijson-3.5.0-cp314-cp314-win32.whl", hash = "sha256:63bc8121bb422f6969ced270173a3fa692c29d4ae30c860a2309941abd81012a"}, + {file = "ijson-3.5.0-cp314-cp314-win_amd64.whl", hash = "sha256:01b6dad72b7b7df225ef970d334556dfad46c696a2c6767fb5d9ed8889728bca"}, + {file = "ijson-3.5.0-cp314-cp314t-macosx_10_15_universal2.whl", hash = "sha256:2ea4b676ec98e374c1df400a47929859e4fa1239274339024df4716e802aa7e4"}, + {file = "ijson-3.5.0-cp314-cp314t-macosx_10_15_x86_64.whl", hash = "sha256:014586eec043e23c80be9a923c56c3a0920a0f1f7d17478ce7bc20ba443968ef"}, + {file = "ijson-3.5.0-cp314-cp314t-macosx_11_0_arm64.whl", hash = "sha256:d5b8b886b0248652d437f66e7c5ac318bbdcb2c7137a7e5327a68ca00b286f5f"}, + {file = "ijson-3.5.0-cp314-cp314t-manylinux1_i686.manylinux_2_28_i686.manylinux_2_5_i686.whl", hash = "sha256:498fd46ae2349297e43acf97cdc421e711dbd7198418677259393d2acdc62d78"}, + {file = "ijson-3.5.0-cp314-cp314t-manylinux2014_aarch64.manylinux_2_17_aarch64.manylinux_2_28_aarch64.whl", hash = "sha256:22a51b4f9b81f12793731cf226266d1de2112c3c04ba4a04117ad4e466897e05"}, + {file = "ijson-3.5.0-cp314-cp314t-manylinux2014_x86_64.manylinux_2_17_x86_64.manylinux_2_28_x86_64.whl", hash = "sha256:9636c710dc4ac4a281baa266a64f323b4cc165cec26836af702c44328b59a515"}, + {file = "ijson-3.5.0-cp314-cp314t-musllinux_1_2_aarch64.whl", hash = "sha256:f7168a39e8211107666d71b25693fd1b2bac0b33735ef744114c403c6cac21e1"}, + {file = "ijson-3.5.0-cp314-cp314t-musllinux_1_2_i686.whl", hash = "sha256:8696454245415bc617ab03b0dc3ae4c86987df5dc6a90bad378fe72c5409d89e"}, + {file = "ijson-3.5.0-cp314-cp314t-musllinux_1_2_x86_64.whl", hash = "sha256:c21bfb61f71f191565885bf1bc29e0a186292d866b4880637b833848360bdc1b"}, + {file = "ijson-3.5.0-cp314-cp314t-win32.whl", hash = "sha256:a2619460d6795b70d0155e5bf016200ac8a63ab5397aa33588bb02b6c21759e6"}, + {file = "ijson-3.5.0-cp314-cp314t-win_amd64.whl", hash = "sha256:4f24b78d4ef028d17eb57ad1b16c0aed4a17bdd9badbf232dc5d9305b7e13854"}, + {file = "ijson-3.5.0-cp39-cp39-macosx_10_9_universal2.whl", hash = "sha256:0ec62d397447cbe4941818c53e22b054e03250ff9cdbaea75144b11bc6db44ed"}, + {file = "ijson-3.5.0-cp39-cp39-macosx_10_9_x86_64.whl", hash = "sha256:75980237a16e5e36ad46fbdd33e3f3d817c187624974c48947df0a2bfa104b9e"}, + {file = "ijson-3.5.0-cp39-cp39-macosx_11_0_arm64.whl", hash = "sha256:a9c321e8e1cdeac8aac698d09a90d98a049c9be8e8330c89cf2fcc517c96d51d"}, + {file = "ijson-3.5.0-cp39-cp39-manylinux1_i686.manylinux_2_28_i686.manylinux_2_5_i686.whl", hash = "sha256:92878b130d7ad71919c70b4f50ad23ec7fbf2d09a9c675f9179d49c4be869a63"}, + {file = "ijson-3.5.0-cp39-cp39-manylinux2014_aarch64.manylinux_2_17_aarch64.manylinux_2_28_aarch64.whl", hash = "sha256:a1ab890d43656c1d12c4a8dafb7fac5a2278ed3e4408102e0971f48b6ed4583d"}, + {file = "ijson-3.5.0-cp39-cp39-manylinux2014_x86_64.manylinux_2_17_x86_64.manylinux_2_28_x86_64.whl", hash = "sha256:a55185e8983fef0b21abc1a0bbaa11eeb2fabdc651e2167f23defa9fe4eb999b"}, + {file = "ijson-3.5.0-cp39-cp39-musllinux_1_2_aarch64.whl", hash = "sha256:5a3af031e30751164c3289294f249f942535fbe7e8f35eb3ecc374247449214e"}, + {file = "ijson-3.5.0-cp39-cp39-musllinux_1_2_i686.whl", hash = "sha256:f4c8f5ccf7230a9a94c1d836322783ed0c0ec2a151f3d53b2e0a67c89ad66970"}, + {file = "ijson-3.5.0-cp39-cp39-musllinux_1_2_x86_64.whl", hash = "sha256:6e249796d2090afc1c42d2458ab0dbf0072a30ffa246b5683e3f7b9dc9b1b7f9"}, + {file = "ijson-3.5.0-cp39-cp39-win32.whl", hash = "sha256:1b2cf2c0c79313fbc607a0d90788ffb4f4614872983af4aa85c5b92533ec4da2"}, + {file = "ijson-3.5.0-cp39-cp39-win_amd64.whl", hash = "sha256:d38cb03f6b7cc26d542ff710adfe98e5f6d53878461c45456c97d3668297ec0d"}, + {file = "ijson-3.5.0-pp311-pypy311_pp73-macosx_10_15_x86_64.whl", hash = "sha256:d64c624da0e9d692d6eb0ff63a79656b59d76bf80773a17c5b0f835e4e8ef627"}, + {file = "ijson-3.5.0-pp311-pypy311_pp73-macosx_11_0_arm64.whl", hash = "sha256:876f7df73b7e0d6474f9caa729b9cdbfc8e76de9075a4887dfd689e29e85c4ca"}, + {file = "ijson-3.5.0-pp311-pypy311_pp73-manylinux1_i686.manylinux_2_28_i686.manylinux_2_5_i686.whl", hash = "sha256:e7dbff2c8d9027809b0cde663df44f3210da10ea377121d42896fb6ee405dd31"}, + {file = "ijson-3.5.0-pp311-pypy311_pp73-manylinux2014_aarch64.manylinux_2_17_aarch64.manylinux_2_28_aarch64.whl", hash = "sha256:4217a1edc278660679e1197c83a1a2a2d367792bfbb2a3279577f4b59b93730d"}, + {file = "ijson-3.5.0-pp311-pypy311_pp73-manylinux2014_x86_64.manylinux_2_17_x86_64.manylinux_2_28_x86_64.whl", hash = "sha256:04f0fc740311388ee745ba55a12292b722d6f52000b11acbb913982ba5fbdf87"}, + {file = "ijson-3.5.0-pp311-pypy311_pp73-win_amd64.whl", hash = "sha256:fdeee6957f92e0c114f65c55cf8fe7eabb80cfacab64eea6864060913173f66d"}, + {file = "ijson-3.5.0.tar.gz", hash = "sha256:94688760720e3f5212731b3cb8d30267f9a045fb38fb3870254e7b9504246f31"}, +] + [[package]] name = "importlib-metadata" version = "8.7.0" @@ -7045,4 +7151,4 @@ vector-db-based = ["cohere", "langchain_community", "langchain_core", "langchain [metadata] lock-version = "2.1" python-versions = ">=3.10,<3.14" -content-hash = "b785d39f246498c8facd7854999dbdbfb78808489a09922dd3a1551be331ea7d" +content-hash = "50fd249190f0cfd134efa5df94efe41beddbf57ceb880babc02059a8451ac5ea" diff --git a/pyproject.toml b/pyproject.toml index bcdab217b..7a885b796 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -84,6 +84,7 @@ cryptography = ">=44.0.0,<45.0.0" # Constrained as transitive dependency due to pytz = "2024.2" pytest = {version = "^7", optional = true } orjson = "^3.10.7" +ijson = "^3.3.0" # Streaming JSON parser, used by connectors with very large response bodies (e.g. Amazon Seller Partner Brand Analytics reports). serpyco-rs = "^1.10.2" sqlalchemy = {version = "^2.0,!=2.0.36", optional = true } fastapi = { version = ">=0.116.1", optional = true } From 0b5cf79338aa20d15adde265ff6540d2724e0d81 Mon Sep 17 00:00:00 2001 From: Devin AI <158243242+devin-ai-integration[bot]@users.noreply.github.com> Date: Thu, 14 May 2026 20:33:45 +0000 Subject: [PATCH 2/4] feat: add JsonItemsDecoder for streaming large JSON responses Adds a new declarative decoder, JsonItemsDecoder, that streams elements of a nested array out of a single JSON document one at a time using the ijson library. This lets manifest-only connectors decode multi-GB JSON responses (e.g. Amazon Seller Partner Brand Analytics reports) without loading the full document into memory. - New `JsonItemsParser` in composite_raw_decoder.py (wraps ijson.items) - New `JsonItemsDecoder` schema entry, wired into GzipDecoder / ZipfileDecoder / top-level decoder unions so it composes with the existing decoder hierarchy - Pydantic models regenerated from schema - Factory: create_json_items_decoder + JsonItemsDecoderModel handling in _get_parser - Drop ijson from deptry DEP002 ignore list now that the CDK imports it directly; update pyproject.toml comment to reflect first-class use - Unit tests covering top-level, nested, empty, encoding, gzip composition, missing path validation, and lazy streaming behavior --- .../declarative_component_schema.yaml | 39 ++ .../decoders/composite_raw_decoder.py | 28 ++ .../models/declarative_component_schema.py | 362 ++++++++++-------- .../parsers/model_to_component_factory.py | 18 + pyproject.toml | 2 +- .../decoders/test_composite_decoder.py | 122 ++++++ 6 files changed, 415 insertions(+), 156 deletions(-) diff --git a/airbyte_cdk/sources/declarative/declarative_component_schema.yaml b/airbyte_cdk/sources/declarative/declarative_component_schema.yaml index 6fbda8f00..6df8092be 100644 --- a/airbyte_cdk/sources/declarative/declarative_component_schema.yaml +++ b/airbyte_cdk/sources/declarative/declarative_component_schema.yaml @@ -2628,6 +2628,40 @@ definitions: type: type: string enum: [JsonDecoder] + JsonItemsDecoder: + title: JSON Items (Streaming) + description: >- + Select 'JSON Items (Streaming)' to stream-decode a single JSON document + by yielding each element of a nested array, one at a time. Use this for + very large single-document JSON responses (e.g. a wrapping object + containing a multi-GB array) where buffering the whole document into + memory would cause out-of-memory errors. Powered by the `ijson` + streaming parser. + type: object + required: + - type + - items_path + properties: + type: + type: string + enum: [JsonItemsDecoder] + items_path: + title: Items Path + description: >- + Dot-separated path to the JSON array whose elements should be + yielded as records. Uses `ijson` path syntax (e.g. `data.users`), + not JSONPath syntax — do not include leading `$.` or trailing + `[*]`. + type: string + examples: + - dataByDepartmentAndSearchTerm + - dataByAsin + - data.users + encoding: + title: Encoding + description: Text encoding used to decode the streamed bytes before JSON parsing. + type: string + default: utf-8 JsonlDecoder: title: JSON Lines description: Select 'JSON Lines' if the response consists of JSON objects separated by new lines ('\n') in JSONL format. @@ -2869,6 +2903,7 @@ definitions: - "$ref": "#/definitions/CsvDecoder" - "$ref": "#/definitions/GzipDecoder" - "$ref": "#/definitions/JsonDecoder" + - "$ref": "#/definitions/JsonItemsDecoder" - "$ref": "#/definitions/JsonlDecoder" ListPartitionRouter: title: List Partition Router @@ -3909,6 +3944,7 @@ definitions: description: Component decoding the response so records can be extracted. anyOf: - "$ref": "#/definitions/JsonDecoder" + - "$ref": "#/definitions/JsonItemsDecoder" - "$ref": "#/definitions/XmlDecoder" - "$ref": "#/definitions/CsvDecoder" - "$ref": "#/definitions/JsonlDecoder" @@ -3997,6 +4033,7 @@ definitions: - "$ref": "#/definitions/CsvDecoder" - "$ref": "#/definitions/GzipDecoder" - "$ref": "#/definitions/JsonDecoder" + - "$ref": "#/definitions/JsonItemsDecoder" - "$ref": "#/definitions/JsonlDecoder" CsvDecoder: title: CSV @@ -4163,6 +4200,7 @@ definitions: - "$ref": "#/definitions/CsvDecoder" - "$ref": "#/definitions/GzipDecoder" - "$ref": "#/definitions/JsonDecoder" + - "$ref": "#/definitions/JsonItemsDecoder" - "$ref": "#/definitions/JsonlDecoder" - "$ref": "#/definitions/IterableDecoder" - "$ref": "#/definitions/XmlDecoder" @@ -4175,6 +4213,7 @@ definitions: - "$ref": "#/definitions/CsvDecoder" - "$ref": "#/definitions/GzipDecoder" - "$ref": "#/definitions/JsonDecoder" + - "$ref": "#/definitions/JsonItemsDecoder" - "$ref": "#/definitions/JsonlDecoder" - "$ref": "#/definitions/IterableDecoder" - "$ref": "#/definitions/XmlDecoder" diff --git a/airbyte_cdk/sources/declarative/decoders/composite_raw_decoder.py b/airbyte_cdk/sources/declarative/decoders/composite_raw_decoder.py index 7acb5c1e2..c70fd6582 100644 --- a/airbyte_cdk/sources/declarative/decoders/composite_raw_decoder.py +++ b/airbyte_cdk/sources/declarative/decoders/composite_raw_decoder.py @@ -11,6 +11,7 @@ from io import BufferedIOBase, TextIOWrapper from typing import Any, List, Optional +import ijson import orjson import requests @@ -98,6 +99,33 @@ def parse(self, data: BufferedIOBase) -> PARSER_OUTPUT_TYPE: logger.warning(f"Cannot decode/parse line {line!r} as JSON, error: {e}") +@dataclass +class JsonItemsParser(Parser): + """Streaming JSON parser that yields each element of a nested array. + + Use this for very large single-document JSON responses where the records + of interest live under a nested array (e.g. `dataByDepartmentAndSearchTerm`, + `data.users`). Powered by `ijson`, this parser does not materialize the + full document — peak memory is bounded by a single record plus ijson's + internal parse buffers, regardless of document size. + + `items_path` uses `ijson` dotted path syntax (e.g. `data.users`), not + JSONPath syntax (`$.data.users[*]`). Internally we append `.item`, which + is the `ijson` convention for "iterate elements of this array". + """ + + items_path: str = "" + encoding: Optional[str] = "utf-8" + + def parse(self, data: BufferedIOBase) -> PARSER_OUTPUT_TYPE: + if not self.items_path: + raise ValueError("JsonItemsParser requires a non-empty items_path.") + # ijson auto-selects the best available backend (yajl2_c when present) + # and reads from `data` lazily — it does not call `.read()` on the + # whole stream up front. + yield from ijson.items(data, f"{self.items_path}.item") + + @dataclass class CsvParser(Parser): # TODO: migrate implementation to re-use file-base classes diff --git a/airbyte_cdk/sources/declarative/models/declarative_component_schema.py b/airbyte_cdk/sources/declarative/models/declarative_component_schema.py index 931cef7f1..ddd3160f6 100644 --- a/airbyte_cdk/sources/declarative/models/declarative_component_schema.py +++ b/airbyte_cdk/sources/declarative/models/declarative_component_schema.py @@ -6,7 +6,7 @@ from enum import Enum from typing import Any, Dict, List, Literal, Optional, Union -from pydantic.v1 import BaseModel, Extra, Field +from pydantic.v1 import BaseModel, Extra, Field, conint from airbyte_cdk.sources.declarative.models.base_model_with_deprecations import ( BaseModelWithDeprecations, @@ -18,12 +18,6 @@ class AuthFlowType(Enum): oauth1_0 = "oauth1.0" -class ScopesJoinStrategy(Enum): - space = "space" - comma = "comma" - plus = "plus" - - class BasicHttpAuthenticator(BaseModel): type: Literal["BasicHttpAuthenticator"] username: str = Field( @@ -57,10 +51,9 @@ class DynamicStreamCheckConfig(BaseModel): dynamic_stream_name: str = Field( ..., description="The dynamic stream name.", title="Dynamic Stream Name" ) - stream_count: Optional[int] = Field( + stream_count: Optional[conint(ge=1)] = Field( None, description="The number of streams to attempt reading from during a check operation. If unset, all generated streams are checked. Must be a positive integer; if it exceeds the total number of available streams, all streams are checked.", - ge=1, title="Stream Count", ) @@ -489,7 +482,7 @@ class Config: ) weight: Optional[Union[int, str]] = Field( None, - description="The weight of a request matching this matcher when acquiring a call from the rate limiter. Different endpoints can consume different amounts from a shared budget by specifying different weights. If not set, each request counts as 1.", + description="The weight of a request matching this matcher when acquiring a call from the rate limiter. Different endpoints can consume different amounts from a shared budget by specifying different weights. If not set, each request counts as 1.\n", title="Weight", ) @@ -504,6 +497,32 @@ class OnNoRecords(Enum): emit_parent = "emit_parent" +class RecordExpander(BaseModel): + type: Literal["RecordExpander"] + expand_records_from_field: List[str] = Field( + ..., + description="Path to a nested array field within each record. Items from this array will be extracted and emitted as separate records. Supports wildcards (*) for matching multiple arrays.", + examples=[ + ["lines", "data"], + ["items"], + ["nested", "array"], + ["sections", "*", "items"], + ], + title="Expand Records From Field", + ) + remain_original_record: Optional[bool] = Field( + False, + description='If true, each expanded record will include the original parent record in an "original_record" field. Defaults to false.', + title="Remain Original Record", + ) + on_no_records: Optional[OnNoRecords] = Field( + OnNoRecords.skip, + description='Behavior when the expansion path is missing, not a list, or an empty list. "skip" (default) emits nothing. "emit_parent" emits the original parent record unchanged.', + title="On No Records", + ) + parameters: Optional[Dict[str, Any]] = Field(None, alias="$parameters") + + class ExponentialBackoffStrategy(BaseModel): type: Literal["ExponentialBackoffStrategy"] factor: Optional[Union[float, str]] = Field( @@ -660,6 +679,21 @@ class JsonDecoder(BaseModel): type: Literal["JsonDecoder"] +class JsonItemsDecoder(BaseModel): + type: Literal["JsonItemsDecoder"] + items_path: str = Field( + ..., + description="Dot-separated path to the JSON array whose elements should be yielded as records. Uses `ijson` path syntax (e.g. `data.users`), not JSONPath syntax — do not include leading `$.` or trailing `[*]`.", + examples=["dataByDepartmentAndSearchTerm", "dataByAsin", "data.users"], + title="Items Path", + ) + encoding: Optional[str] = Field( + "utf-8", + description="Text encoding used to decode the streamed bytes before JSON parsing.", + title="Encoding", + ) + + class JsonlDecoder(BaseModel): type: Literal["JsonlDecoder"] @@ -818,24 +852,38 @@ class NoPagination(BaseModel): type: Literal["NoPagination"] -class State(BaseModel): +class Scope(BaseModel): class Config: extra = Extra.allow - min: int - max: int + scope: str = Field( + ..., description="The OAuth scope string to request from the provider." + ) -class OAuthScope(BaseModel): +class OptionalScope(BaseModel): class Config: extra = Extra.allow scope: str = Field( - ..., - description="The OAuth scope string to request from the provider.", + ..., description="The OAuth scope string to request from the provider." ) +class ScopesJoinStrategy(Enum): + space = "space" + comma = "comma" + plus = "plus" + + +class State(BaseModel): + class Config: + extra = Extra.allow + + min: int + max: int + + class OauthConnectorInputSpecification(BaseModel): class Config: extra = Extra.allow @@ -855,17 +903,13 @@ class Config: examples=["user:read user:read_orders workspaces:read"], title="Scopes", ) - # NOTE: scopes, optional_scopes, and scopes_join_strategy are processed by the - # platform OAuth handler (DeclarativeOAuthSpecHandler.kt), not by the CDK runtime. - # The CDK schema defines the manifest contract; the platform reads these fields - # during the OAuth consent flow to build the authorization URL. - scopes: Optional[List[OAuthScope]] = Field( + scopes: Optional[List[Scope]] = Field( None, description="List of OAuth scope objects. When present, takes precedence over the `scope` string property.\nThe scope values are joined using the `scopes_join_strategy` (default: space) before being\nsent to the OAuth provider.", examples=[[{"scope": "user:read"}, {"scope": "user:write"}]], title="Scopes", ) - optional_scopes: Optional[List[OAuthScope]] = Field( + optional_scopes: Optional[List[OptionalScope]] = Field( None, description="Optional OAuth scope objects that may or may not be granted.", examples=[[{"scope": "admin:read"}]], @@ -960,24 +1004,28 @@ class OAuthConfigSpecification(BaseModel): class Config: extra = Extra.allow - oauth_user_input_from_connector_config_specification: Optional[Dict[str, Any]] = Field( - None, - description="OAuth specific blob. This is a Json Schema used to validate Json configurations used as input to OAuth.\nMust be a valid non-nested JSON that refers to properties from ConnectorSpecification.connectionSpecification\nusing special annotation 'path_in_connector_config'.\nThese are input values the user is entering through the UI to authenticate to the connector, that might also shared\nas inputs for syncing data via the connector.\nExamples:\nif no connector values is shared during oauth flow, oauth_user_input_from_connector_config_specification=[]\nif connector values such as 'app_id' inside the top level are used to generate the API url for the oauth flow,\n oauth_user_input_from_connector_config_specification={\n app_id: {\n type: string\n path_in_connector_config: ['app_id']\n }\n }\nif connector values such as 'info.app_id' nested inside another object are used to generate the API url for the oauth flow,\n oauth_user_input_from_connector_config_specification={\n app_id: {\n type: string\n path_in_connector_config: ['info', 'app_id']\n }\n }", - examples=[ - {"app_id": {"type": "string", "path_in_connector_config": ["app_id"]}}, - { - "app_id": { - "type": "string", - "path_in_connector_config": ["info", "app_id"], - } - }, - ], - title="OAuth user input", + oauth_user_input_from_connector_config_specification: Optional[Dict[str, Any]] = ( + Field( + None, + description="OAuth specific blob. This is a Json Schema used to validate Json configurations used as input to OAuth.\nMust be a valid non-nested JSON that refers to properties from ConnectorSpecification.connectionSpecification\nusing special annotation 'path_in_connector_config'.\nThese are input values the user is entering through the UI to authenticate to the connector, that might also shared\nas inputs for syncing data via the connector.\nExamples:\nif no connector values is shared during oauth flow, oauth_user_input_from_connector_config_specification=[]\nif connector values such as 'app_id' inside the top level are used to generate the API url for the oauth flow,\n oauth_user_input_from_connector_config_specification={\n app_id: {\n type: string\n path_in_connector_config: ['app_id']\n }\n }\nif connector values such as 'info.app_id' nested inside another object are used to generate the API url for the oauth flow,\n oauth_user_input_from_connector_config_specification={\n app_id: {\n type: string\n path_in_connector_config: ['info', 'app_id']\n }\n }", + examples=[ + {"app_id": {"type": "string", "path_in_connector_config": ["app_id"]}}, + { + "app_id": { + "type": "string", + "path_in_connector_config": ["info", "app_id"], + } + }, + ], + title="OAuth user input", + ) ) - oauth_connector_input_specification: Optional[OauthConnectorInputSpecification] = Field( - None, - description='The DeclarativeOAuth specific blob.\nPertains to the fields defined by the connector relating to the OAuth flow.\n\nInterpolation capabilities:\n- The variables placeholders are declared as `{{my_var}}`.\n- The nested resolution variables like `{{ {{my_nested_var}} }}` is allowed as well.\n\n- The allowed interpolation context is:\n + base64Encoder - encode to `base64`, {{ {{my_var_a}}:{{my_var_b}} | base64Encoder }}\n + base64Decorer - decode from `base64` encoded string, {{ {{my_string_variable_or_string_value}} | base64Decoder }}\n + urlEncoder - encode the input string to URL-like format, {{ https://test.host.com/endpoint | urlEncoder}}\n + urlDecorer - decode the input url-encoded string into text format, {{ urlDecoder:https%3A%2F%2Fairbyte.io | urlDecoder}}\n + codeChallengeS256 - get the `codeChallenge` encoded value to provide additional data-provider specific authorisation values, {{ {{state_value}} | codeChallengeS256 }}\n\nExamples:\n - The TikTok Marketing DeclarativeOAuth spec:\n {\n "oauth_connector_input_specification": {\n "type": "object",\n "additionalProperties": false,\n "properties": {\n "consent_url": "https://ads.tiktok.com/marketing_api/auth?{{client_id_key}}={{client_id_value}}&{{redirect_uri_key}}={{ {{redirect_uri_value}} | urlEncoder}}&{{state_key}}={{state_value}}",\n "access_token_url": "https://business-api.tiktok.com/open_api/v1.3/oauth2/access_token/",\n "access_token_params": {\n "{{ auth_code_key }}": "{{ auth_code_value }}",\n "{{ client_id_key }}": "{{ client_id_value }}",\n "{{ client_secret_key }}": "{{ client_secret_value }}"\n },\n "access_token_headers": {\n "Content-Type": "application/json",\n "Accept": "application/json"\n },\n "extract_output": ["data.access_token"],\n "client_id_key": "app_id",\n "client_secret_key": "secret",\n "auth_code_key": "auth_code"\n }\n }\n }', - title="DeclarativeOAuth Connector Specification", + oauth_connector_input_specification: Optional[OauthConnectorInputSpecification] = ( + Field( + None, + description='The DeclarativeOAuth specific blob.\nPertains to the fields defined by the connector relating to the OAuth flow.\n\nInterpolation capabilities:\n- The variables placeholders are declared as `{{my_var}}`.\n- The nested resolution variables like `{{ {{my_nested_var}} }}` is allowed as well.\n\n- The allowed interpolation context is:\n + base64Encoder - encode to `base64`, {{ {{my_var_a}}:{{my_var_b}} | base64Encoder }}\n + base64Decorer - decode from `base64` encoded string, {{ {{my_string_variable_or_string_value}} | base64Decoder }}\n + urlEncoder - encode the input string to URL-like format, {{ https://test.host.com/endpoint | urlEncoder}}\n + urlDecorer - decode the input url-encoded string into text format, {{ urlDecoder:https%3A%2F%2Fairbyte.io | urlDecoder}}\n + codeChallengeS256 - get the `codeChallenge` encoded value to provide additional data-provider specific authorisation values, {{ {{state_value}} | codeChallengeS256 }}\n\nExamples:\n - The TikTok Marketing DeclarativeOAuth spec:\n {\n "oauth_connector_input_specification": {\n "type": "object",\n "additionalProperties": false,\n "properties": {\n "consent_url": "https://ads.tiktok.com/marketing_api/auth?{{client_id_key}}={{client_id_value}}&{{redirect_uri_key}}={{ {{redirect_uri_value}} | urlEncoder}}&{{state_key}}={{state_value}}",\n "access_token_url": "https://business-api.tiktok.com/open_api/v1.3/oauth2/access_token/",\n "access_token_params": {\n "{{ auth_code_key }}": "{{ auth_code_value }}",\n "{{ client_id_key }}": "{{ client_id_value }}",\n "{{ client_secret_key }}": "{{ client_secret_value }}"\n },\n "access_token_headers": {\n "Content-Type": "application/json",\n "Accept": "application/json"\n },\n "extract_output": ["data.access_token"],\n "client_id_key": "app_id",\n "client_secret_key": "secret",\n "auth_code_key": "auth_code"\n }\n }\n }', + title="DeclarativeOAuth Connector Specification", + ) ) complete_oauth_output_specification: Optional[Dict[str, Any]] = Field( None, @@ -995,7 +1043,9 @@ class Config: complete_oauth_server_input_specification: Optional[Dict[str, Any]] = Field( None, description="OAuth specific blob. This is a Json Schema used to validate Json configurations persisted as Airbyte Server configurations.\nMust be a valid non-nested JSON describing additional fields configured by the Airbyte Instance or Workspace Admins to be used by the\nserver when completing an OAuth flow (typically exchanging an auth code for refresh token).\nExamples:\n complete_oauth_server_input_specification={\n client_id: {\n type: string\n },\n client_secret: {\n type: string\n }\n }", - examples=[{"client_id": {"type": "string"}, "client_secret": {"type": "string"}}], + examples=[ + {"client_id": {"type": "string"}, "client_secret": {"type": "string"}} + ], title="OAuth input specification", ) complete_oauth_server_output_specification: Optional[Dict[str, Any]] = Field( @@ -1238,7 +1288,14 @@ class AsyncJobStatusMap(BaseModel): completed: List[str] failed: List[str] timeout: List[str] - skipped: Optional[List[str]] = None + skipped: Optional[List[str]] = Field( + None, + description="Statuses that indicate the job was skipped because there is no data to return. Jobs with these statuses will not be retried and no records will be fetched.", + ) + + +class BlockSimultaneousSyncsAction(BaseModel): + type: Literal["BlockSimultaneousSyncsAction"] class ValueType(Enum): @@ -1500,7 +1557,9 @@ class CustomConfigTransformation(BaseModel): class_name: str = Field( ..., description="Fully-qualified name of the class that will be implementing the custom config transformation. The format is `source_..`.", - examples=["source_declarative_manifest.components.MyCustomConfigTransformation"], + examples=[ + "source_declarative_manifest.components.MyCustomConfigTransformation" + ], ) parameters: Optional[Dict[str, Any]] = Field( None, @@ -1928,7 +1987,9 @@ class OAuthAuthenticator(BaseModel): scopes: Optional[List[str]] = Field( None, description="List of scopes that should be granted to the access token.", - examples=[["crm.list.read", "crm.objects.contacts.read", "crm.schema.contacts.read"]], + examples=[ + ["crm.list.read", "crm.objects.contacts.read", "crm.schema.contacts.read"] + ], title="Scopes", ) token_expiry_date: Optional[str] = Field( @@ -2079,28 +2140,23 @@ class DefaultPaginator(BaseModel): parameters: Optional[Dict[str, Any]] = Field(None, alias="$parameters") -class RecordExpander(BaseModel): - type: Literal["RecordExpander"] - expand_records_from_field: List[str] = Field( +class DpathExtractor(BaseModel): + type: Literal["DpathExtractor"] + field_path: List[str] = Field( ..., - description="Path to a nested array field within each record. Items from this array will be extracted and emitted as separate records. Supports wildcards (*) for matching multiple arrays.", + description='List of potentially nested fields describing the full path of the field to extract. Use "*" to extract all values from an array. See more info in the [docs](https://docs.airbyte.com/connector-development/config-based/understanding-the-yaml-file/record-selector).', examples=[ - ["lines", "data"], - ["items"], - ["nested", "array"], - ["sections", "*", "items"], + ["data"], + ["data", "records"], + ["data", "{{ parameters.name }}"], + ["data", "*", "record"], ], - title="Expand Records From Field", - ) - remain_original_record: Optional[bool] = Field( - False, - description='If true, each expanded record will include the original parent record in an "original_record" field. Defaults to false.', - title="Remain Original Record", + title="Field Path", ) - on_no_records: Optional[OnNoRecords] = Field( - OnNoRecords.skip, - description='Behavior when the expansion path is missing, not a list, or an empty list. "skip" (default) emits nothing. "emit_parent" emits the original parent record unchanged.', - title="On No Records", + record_expander: Optional[RecordExpander] = Field( + None, + description="Optional component to expand records by extracting items from nested array fields.", + title="Record Expander", ) parameters: Optional[Dict[str, Any]] = Field(None, alias="$parameters") @@ -2173,6 +2229,29 @@ class ListPartitionRouter(BaseModel): parameters: Optional[Dict[str, Any]] = Field(None, alias="$parameters") +class RecordSelector(BaseModel): + type: Literal["RecordSelector"] + extractor: Union[DpathExtractor, CustomRecordExtractor] + record_filter: Optional[Union[RecordFilter, CustomRecordFilter]] = Field( + None, + description="Responsible for filtering records to be emitted by the Source.", + title="Record Filter", + ) + schema_normalization: Optional[ + Union[SchemaNormalization, CustomSchemaNormalization] + ] = Field( + None, + description="Responsible for normalization according to the schema.", + title="Schema Normalization", + ) + transform_before_filtering: Optional[bool] = Field( + None, + description="If true, transformation will be applied before record filtering.", + title="Transform Before Filtering", + ) + parameters: Optional[Dict[str, Any]] = Field(None, alias="$parameters") + + class PaginationReset(BaseModel): type: Literal["PaginationReset"] action: Action1 @@ -2181,7 +2260,7 @@ class PaginationReset(BaseModel): class GzipDecoder(BaseModel): type: Literal["GzipDecoder"] - decoder: Union[CsvDecoder, GzipDecoder, JsonDecoder, JsonlDecoder] + decoder: Union[CsvDecoder, GzipDecoder, JsonDecoder, JsonItemsDecoder, JsonlDecoder] class RequestBodyGraphQL(BaseModel): @@ -2202,10 +2281,12 @@ class DpathValidator(BaseModel): ], title="Field Path", ) - validation_strategy: Union[ValidateAdheresToSchema, CustomValidationStrategy] = Field( - ..., - description="The condition that the specified config value will be evaluated against", - title="Validation Strategy", + validation_strategy: Union[ValidateAdheresToSchema, CustomValidationStrategy] = ( + Field( + ..., + description="The condition that the specified config value will be evaluated against", + title="Validation Strategy", + ) ) @@ -2222,10 +2303,12 @@ class PredicateValidator(BaseModel): ], title="Value", ) - validation_strategy: Union[ValidateAdheresToSchema, CustomValidationStrategy] = Field( - ..., - description="The validation strategy to apply to the value.", - title="Validation Strategy", + validation_strategy: Union[ValidateAdheresToSchema, CustomValidationStrategy] = ( + Field( + ..., + description="The validation strategy to apply to the value.", + title="Validation Strategy", + ) ) @@ -2250,12 +2333,12 @@ class ConfigAddFields(BaseModel): class CompositeErrorHandler(BaseModel): type: Literal["CompositeErrorHandler"] - error_handlers: List[Union[CompositeErrorHandler, DefaultErrorHandler, CustomErrorHandler]] = ( - Field( - ..., - description="List of error handlers to iterate on to determine how to handle a failed response.", - title="Error Handlers", - ) + error_handlers: List[ + Union[CompositeErrorHandler, DefaultErrorHandler, CustomErrorHandler] + ] = Field( + ..., + description="List of error handlers to iterate on to determine how to handle a failed response.", + title="Error Handlers", ) parameters: Optional[Dict[str, Any]] = Field(None, alias="$parameters") @@ -2293,60 +2376,20 @@ class Config: ) -class DpathExtractor(BaseModel): - type: Literal["DpathExtractor"] - field_path: List[str] = Field( - ..., - description='List of potentially nested fields describing the full path of the field to extract. Use "*" to extract all values from an array. See more info in the [docs](https://docs.airbyte.com/connector-development/config-based/understanding-the-yaml-file/record-selector).', - examples=[ - ["data"], - ["data", "records"], - ["data", "{{ parameters.name }}"], - ["data", "*", "record"], - ], - title="Field Path", - ) - record_expander: Optional[RecordExpander] = Field( - None, - description="Optional component to expand records by extracting items from nested array fields.", - title="Record Expander", - ) - parameters: Optional[Dict[str, Any]] = Field(None, alias="$parameters") - - class ZipfileDecoder(BaseModel): class Config: extra = Extra.allow type: Literal["ZipfileDecoder"] - decoder: Union[CsvDecoder, GzipDecoder, JsonDecoder, JsonlDecoder] = Field( + decoder: Union[ + CsvDecoder, GzipDecoder, JsonDecoder, JsonItemsDecoder, JsonlDecoder + ] = Field( ..., description="Parser to parse the decompressed data from the zipfile(s).", title="Parser", ) -class RecordSelector(BaseModel): - type: Literal["RecordSelector"] - extractor: Union[DpathExtractor, CustomRecordExtractor] - record_filter: Optional[Union[RecordFilter, CustomRecordFilter]] = Field( - None, - description="Responsible for filtering records to be emitted by the Source.", - title="Record Filter", - ) - schema_normalization: Optional[Union[SchemaNormalization, CustomSchemaNormalization]] = Field( - None, - description="Responsible for normalization according to the schema.", - title="Schema Normalization", - ) - transform_before_filtering: Optional[bool] = Field( - None, - description="If true, transformation will be applied before record filtering.", - title="Transform Before Filtering", - ) - parameters: Optional[Dict[str, Any]] = Field(None, alias="$parameters") - - class ConfigMigration(BaseModel): type: Literal["ConfigMigration"] description: Optional[str] = Field( @@ -2439,7 +2482,7 @@ class Config: api_budget: Optional[HTTPAPIBudget] = None stream_groups: Optional[Dict[str, StreamGroup]] = Field( None, - description="Groups of streams that share a common resource and should not be read simultaneously. Each group defines a set of stream references and an action that controls how concurrent reads are managed. Only applies to ConcurrentDeclarativeSource.", + description="Groups of streams that share a common resource and should not be read simultaneously. Each group defines a set of stream references and an action that controls how concurrent reads are managed. Only applies to ConcurrentDeclarativeSource.\n", title="Stream Groups", ) max_concurrent_async_job_count: Optional[Union[int, str]] = Field( @@ -2464,9 +2507,9 @@ class Config: type: Literal["DeclarativeSource"] check: Union[CheckStream, CheckDynamicStream] - streams: Optional[List[Union[ConditionalStreams, DeclarativeStream, StateDelegatingStream]]] = ( - None - ) + streams: Optional[ + List[Union[ConditionalStreams, DeclarativeStream, StateDelegatingStream]] + ] = None dynamic_streams: List[DynamicDeclarativeStream] version: str = Field( ..., @@ -2479,7 +2522,7 @@ class Config: api_budget: Optional[HTTPAPIBudget] = None stream_groups: Optional[Dict[str, StreamGroup]] = Field( None, - description="Groups of streams that share a common resource and should not be read simultaneously. Each group defines a set of stream references and an action that controls how concurrent reads are managed. Only applies to ConcurrentDeclarativeSource.", + description="Groups of streams that share a common resource and should not be read simultaneously. Each group defines a set of stream references and an action that controls how concurrent reads are managed. Only applies to ConcurrentDeclarativeSource.\n", title="Stream Groups", ) max_concurrent_async_job_count: Optional[Union[int, str]] = Field( @@ -2596,16 +2639,20 @@ class Config: extra = Extra.allow type: Literal["DeclarativeStream"] - name: Optional[str] = Field("", description="The stream name.", example=["Users"], title="Name") + name: Optional[str] = Field( + "", description="The stream name.", example=["Users"], title="Name" + ) retriever: Union[SimpleRetriever, AsyncRetriever, CustomRetriever] = Field( ..., description="Component used to coordinate how records are extracted across stream slices and request pages.", title="Retriever", ) - incremental_sync: Optional[Union[DatetimeBasedCursor, IncrementingCountCursor]] = Field( - None, - description="Component used to fetch data incrementally based on a time field in the data.", - title="Incremental Sync", + incremental_sync: Optional[Union[DatetimeBasedCursor, IncrementingCountCursor]] = ( + Field( + None, + description="Component used to fetch data incrementally based on a time field in the data.", + title="Incremental Sync", + ) ) primary_key: Optional[PrimaryKey] = Field("", title="Primary Key") schema_loader: Optional[ @@ -2779,18 +2826,20 @@ class HttpRequester(BaseModelWithDeprecations): description="For APIs that require explicit specification of the properties to query for, this component will take a static or dynamic set of properties (which can be optionally split into chunks) and allow them to be injected into an outbound request by accessing stream_partition.extra_fields.", title="Query Properties", ) - request_parameters: Optional[Union[Dict[str, Union[str, QueryProperties]], str]] = Field( - None, - description="Specifies the query parameters that should be set on an outgoing HTTP request given the inputs.", - examples=[ - {"unit": "day"}, - { - "query": 'last_event_time BETWEEN TIMESTAMP "{{ stream_interval.start_time }}" AND TIMESTAMP "{{ stream_interval.end_time }}"' - }, - {"searchIn": "{{ ','.join(config.get('search_in', [])) }}"}, - {"sort_by[asc]": "updated_at"}, - ], - title="Query Parameters", + request_parameters: Optional[Union[Dict[str, Union[str, QueryProperties]], str]] = ( + Field( + None, + description="Specifies the query parameters that should be set on an outgoing HTTP request given the inputs.", + examples=[ + {"unit": "day"}, + { + "query": 'last_event_time BETWEEN TIMESTAMP "{{ stream_interval.start_time }}" AND TIMESTAMP "{{ stream_interval.end_time }}"' + }, + {"searchIn": "{{ ','.join(config.get('search_in', [])) }}"}, + {"sort_by[asc]": "updated_at"}, + ], + title="Query Parameters", + ) ) request_headers: Optional[Union[Dict[str, str], str]] = Field( None, @@ -2962,7 +3011,9 @@ class QueryProperties(BaseModel): class StateDelegatingStream(BaseModel): type: Literal["StateDelegatingStream"] - name: str = Field(..., description="The stream name.", example=["Users"], title="Name") + name: str = Field( + ..., description="The stream name.", example=["Users"], title="Name" + ) full_refresh_stream: DeclarativeStream = Field( ..., description="Component used to coordinate how records are extracted across stream slices and request pages when the state is empty or not provided.", @@ -2975,7 +3026,7 @@ class StateDelegatingStream(BaseModel): ) api_retention_period: Optional[str] = Field( None, - description="The data retention period of the incremental API (ISO8601 duration). If the cursor value is older than this retention period, the connector will automatically fall back to a full refresh to avoid data loss.\nThis is useful for APIs like Stripe Events API which only retain data for 30 days.\n * **PT1H**: 1 hour\n * **P1D**: 1 day\n * **P1W**: 1 week\n * **P1M**: 1 month\n * **P1Y**: 1 year\n * **P30D**: 30 days\n", + description="The data retention period of the incremental API (ISO8601 duration). If the cursor value is older than this retention period, the connector will automatically fall back to a full refresh to avoid data loss.\nThis is useful for APIs like Stripe Events API which only retain data for 30 days.\n* **PT1H**: 1 hour\n* **P1D**: 1 day\n* **P1W**: 1 week\n* **P1M**: 1 month\n* **P1Y**: 1 year\n* **P30D**: 30 days\n", examples=["P30D", "P90D", "P1Y"], title="API Retention Period", ) @@ -2991,6 +3042,7 @@ class SimpleRetriever(BaseModel): decoder: Optional[ Union[ JsonDecoder, + JsonItemsDecoder, XmlDecoder, CsvDecoder, JsonlDecoder, @@ -3055,13 +3107,17 @@ class AsyncRetriever(BaseModel): status_extractor: Union[DpathExtractor, CustomRecordExtractor] = Field( ..., description="Responsible for fetching the actual status of the async job." ) - download_target_extractor: Optional[Union[DpathExtractor, CustomRecordExtractor]] = Field( + download_target_extractor: Optional[ + Union[DpathExtractor, CustomRecordExtractor] + ] = Field( None, description="Responsible for fetching the final result `urls` provided by the completed / finished / ready async job.", ) download_extractor: Optional[ Union[DpathExtractor, CustomRecordExtractor, ResponseToFileExtractor] - ] = Field(None, description="Responsible for fetching the records from provided urls.") + ] = Field( + None, description="Responsible for fetching the records from provided urls." + ) creation_requester: Union[HttpRequester, CustomRequester] = Field( ..., description="Requester component that describes how to prepare HTTP requests to send to the source API to create the async server-side job.", @@ -3074,7 +3130,7 @@ class AsyncRetriever(BaseModel): None, description="The time in minutes after which the single Async Job should be considered as Timed Out.", ) - failed_retry_wait_time_in_seconds: Optional[Union[int, str]] = Field( + failed_retry_wait_time_in_seconds: Optional[Union[conint(ge=1), str]] = Field( None, description="Time in seconds to wait before retrying a failed async job. Only applies to jobs that ran on the API side and reported a FAILED status (e.g. report generation failed due to a cooldown). Creation failures (HTTP errors when starting a job, such as 429s) and TIMED_OUT jobs are retried immediately and are not affected by this setting. When set, the orchestrator defers retry of real failed jobs until the wait time has elapsed, without blocking other jobs.", ) @@ -3123,6 +3179,7 @@ class AsyncRetriever(BaseModel): CsvDecoder, GzipDecoder, JsonDecoder, + JsonItemsDecoder, JsonlDecoder, IterableDecoder, XmlDecoder, @@ -3139,6 +3196,7 @@ class AsyncRetriever(BaseModel): CsvDecoder, GzipDecoder, JsonDecoder, + JsonItemsDecoder, JsonlDecoder, IterableDecoder, XmlDecoder, @@ -3153,20 +3211,14 @@ class AsyncRetriever(BaseModel): parameters: Optional[Dict[str, Any]] = Field(None, alias="$parameters") -class BlockSimultaneousSyncsAction(BaseModel): - type: Literal["BlockSimultaneousSyncsAction"] - - class StreamGroup(BaseModel): - streams: List[str] = Field( + streams: List[DeclarativeStream] = Field( ..., - description='List of references to streams that belong to this group. Use JSON references to stream definitions (e.g., "#/definitions/my_stream").', + description="List of references to streams that belong to this group.\n", title="Streams", ) action: BlockSimultaneousSyncsAction = Field( - ..., - description="The action to apply to streams in this group.", - title="Action", + ..., description="The action to apply to streams in this group.", title="Action" ) diff --git a/airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py b/airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py index bc9b4f82c..29f0ce10f 100644 --- a/airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py +++ b/airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py @@ -97,6 +97,7 @@ CompositeRawDecoder, CsvParser, GzipParser, + JsonItemsParser, JsonLineParser, JsonParser, Parser, @@ -321,6 +322,9 @@ from airbyte_cdk.sources.declarative.models.declarative_component_schema import ( JsonFileSchemaLoader as JsonFileSchemaLoaderModel, ) +from airbyte_cdk.sources.declarative.models.declarative_component_schema import ( + JsonItemsDecoder as JsonItemsDecoderModel, +) from airbyte_cdk.sources.declarative.models.declarative_component_schema import ( JsonlDecoder as JsonlDecoderModel, ) @@ -763,6 +767,7 @@ def _init_mappings(self) -> None: HttpResponseFilterModel: self.create_http_response_filter, InlineSchemaLoaderModel: self.create_inline_schema_loader, JsonDecoderModel: self.create_json_decoder, + JsonItemsDecoderModel: self.create_json_items_decoder, JsonlDecoderModel: self.create_jsonl_decoder, JsonSchemaPropertySelectorModel: self.create_json_schema_property_selector, GzipDecoderModel: self.create_gzip_decoder, @@ -2656,6 +2661,14 @@ def create_jsonl_decoder( stream_response=False if self._emit_connector_builder_messages else True, ) + def create_json_items_decoder( + self, model: JsonItemsDecoderModel, config: Config, **kwargs: Any + ) -> Decoder: + return CompositeRawDecoder( + parser=ModelToComponentFactory._get_parser(model, config), + stream_response=False if self._emit_connector_builder_messages else True, + ) + def create_gzip_decoder( self, model: GzipDecoderModel, config: Config, **kwargs: Any ) -> Decoder: @@ -2704,6 +2717,11 @@ def _get_parser(model: BaseModel, config: Config) -> Parser: if isinstance(model, JsonDecoderModel): # Note that the logic is a bit different from the JsonDecoder as there is some legacy that is maintained to return {} on error cases return JsonParser() + elif isinstance(model, JsonItemsDecoderModel): + return JsonItemsParser( + items_path=model.items_path, + encoding=model.encoding or "utf-8", + ) elif isinstance(model, JsonlDecoderModel): return JsonLineParser() elif isinstance(model, CsvDecoderModel): diff --git a/pyproject.toml b/pyproject.toml index 7a885b796..ab6975e50 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -84,7 +84,7 @@ cryptography = ">=44.0.0,<45.0.0" # Constrained as transitive dependency due to pytz = "2024.2" pytest = {version = "^7", optional = true } orjson = "^3.10.7" -ijson = "^3.3.0" # Streaming JSON parser, used by connectors with very large response bodies (e.g. Amazon Seller Partner Brand Analytics reports). +ijson = "^3.3.0" # Streaming JSON parser used by `JsonItemsParser` to handle very large response bodies without OOMing. serpyco-rs = "^1.10.2" sqlalchemy = {version = "^2.0,!=2.0.36", optional = true } fastapi = { version = ">=0.116.1", optional = true } diff --git a/unit_tests/sources/declarative/decoders/test_composite_decoder.py b/unit_tests/sources/declarative/decoders/test_composite_decoder.py index d92d6c605..6b4658532 100644 --- a/unit_tests/sources/declarative/decoders/test_composite_decoder.py +++ b/unit_tests/sources/declarative/decoders/test_composite_decoder.py @@ -18,6 +18,7 @@ CompositeRawDecoder, CsvParser, GzipParser, + JsonItemsParser, JsonLineParser, JsonParser, ) @@ -362,3 +363,124 @@ def test_given_response_is_not_streamed_when_decode_then_can_be_called_multiple_ content_second_time = list(composite_raw_decoder.decode(response)) assert content == content_second_time + + +# --------------------------------------------------------------------------- +# JsonItemsParser +# --------------------------------------------------------------------------- + + +def _make_records(count: int) -> List[dict]: + return [{"id": i, "name": f"name-{i}"} for i in range(count)] + + +@pytest.mark.parametrize( + "payload, items_path, expected_count", + [ + pytest.param( + {"dataByDepartmentAndSearchTerm": _make_records(5)}, + "dataByDepartmentAndSearchTerm", + 5, + id="top_level_array", + ), + pytest.param( + {"data": {"users": _make_records(3)}}, + "data.users", + 3, + id="nested_array", + ), + pytest.param( + {"dataByAsin": []}, + "dataByAsin", + 0, + id="empty_array", + ), + ], +) +def test_json_items_parser_yields_each_item( + requests_mock, payload: dict, items_path: str, expected_count: int +) -> None: + requests_mock.register_uri( + "GET", "https://airbyte.io/", content=json.dumps(payload).encode("utf-8") + ) + response = requests.get("https://airbyte.io/", stream=True) + + decoder = CompositeRawDecoder(parser=JsonItemsParser(items_path=items_path)) + records = list(decoder.decode(response)) + + assert len(records) == expected_count + if expected_count: + # Records should be yielded in source order and match the payload. + expected_items = payload + for key in items_path.split("."): + expected_items = expected_items[key] + assert records == expected_items + + +@pytest.mark.parametrize("encoding", ["utf-8", "iso-8859-1"]) +def test_json_items_parser_honors_encoding(requests_mock, encoding: str) -> None: + payload = {"data": [{"name": "Hé"} for _ in range(3)]} + requests_mock.register_uri( + "GET", "https://airbyte.io/", content=json.dumps(payload).encode(encoding) + ) + response = requests.get("https://airbyte.io/", stream=True) + + decoder = CompositeRawDecoder(parser=JsonItemsParser(items_path="data", encoding=encoding)) + records = list(decoder.decode(response)) + + assert records == payload["data"] + + +def test_json_items_parser_composes_with_gzip(requests_mock) -> None: + payload = {"dataByAsin": _make_records(4)} + requests_mock.register_uri( + "GET", + "https://airbyte.io/", + content=compress_with_gzip(json.dumps(payload)), + headers={"Content-Encoding": "gzip"}, + ) + response = requests.get("https://airbyte.io/", stream=True) + + parser = GzipParser(inner_parser=JsonItemsParser(items_path="dataByAsin")) + decoder = CompositeRawDecoder(parser=parser) + + assert list(decoder.decode(response)) == payload["dataByAsin"] + + +def test_json_items_parser_requires_items_path() -> None: + parser = JsonItemsParser() + with pytest.raises(ValueError, match="items_path"): + list(parser.parse(BytesIO(b'{"data": []}'))) + + +def test_json_items_parser_is_lazy() -> None: + """The parser should yield the first record before reading the entire stream.""" + + class _CountingStream: + """A file-like wrapper that counts how many bytes have been read so far.""" + + def __init__(self, content: bytes) -> None: + self._buffer = BytesIO(content) + self.total_size = len(content) + self.bytes_read = 0 + + def read(self, size: int = -1) -> bytes: + chunk = self._buffer.read(size) + self.bytes_read += len(chunk) + return chunk + + def readable(self) -> bool: # pragma: no cover - interface compliance + return True + + # Large enough that pulling one record cannot require reading the whole document. + payload = {"data": _make_records(10_000)} + raw = json.dumps(payload).encode("utf-8") + stream = _CountingStream(raw) + + parser = JsonItemsParser(items_path="data") + iterator = parser.parse(stream) # type: ignore[arg-type] + + # Pull one item — we should not have consumed the entire document by this point. + first = next(iterator) + assert first == {"id": 0, "name": "name-0"} + assert stream.bytes_read < stream.total_size From 180da70074be0ed98bee53cbc80befc9164db5ac Mon Sep 17 00:00:00 2001 From: Devin AI <158243242+devin-ai-integration[bot]@users.noreply.github.com> Date: Thu, 14 May 2026 20:36:04 +0000 Subject: [PATCH 3/4] chore: ruff format regenerated declarative_component_schema.py --- .../models/declarative_component_schema.py | 156 +++++++----------- 1 file changed, 61 insertions(+), 95 deletions(-) diff --git a/airbyte_cdk/sources/declarative/models/declarative_component_schema.py b/airbyte_cdk/sources/declarative/models/declarative_component_schema.py index ddd3160f6..2d4fa79f1 100644 --- a/airbyte_cdk/sources/declarative/models/declarative_component_schema.py +++ b/airbyte_cdk/sources/declarative/models/declarative_component_schema.py @@ -856,18 +856,14 @@ class Scope(BaseModel): class Config: extra = Extra.allow - scope: str = Field( - ..., description="The OAuth scope string to request from the provider." - ) + scope: str = Field(..., description="The OAuth scope string to request from the provider.") class OptionalScope(BaseModel): class Config: extra = Extra.allow - scope: str = Field( - ..., description="The OAuth scope string to request from the provider." - ) + scope: str = Field(..., description="The OAuth scope string to request from the provider.") class ScopesJoinStrategy(Enum): @@ -1004,28 +1000,24 @@ class OAuthConfigSpecification(BaseModel): class Config: extra = Extra.allow - oauth_user_input_from_connector_config_specification: Optional[Dict[str, Any]] = ( - Field( - None, - description="OAuth specific blob. This is a Json Schema used to validate Json configurations used as input to OAuth.\nMust be a valid non-nested JSON that refers to properties from ConnectorSpecification.connectionSpecification\nusing special annotation 'path_in_connector_config'.\nThese are input values the user is entering through the UI to authenticate to the connector, that might also shared\nas inputs for syncing data via the connector.\nExamples:\nif no connector values is shared during oauth flow, oauth_user_input_from_connector_config_specification=[]\nif connector values such as 'app_id' inside the top level are used to generate the API url for the oauth flow,\n oauth_user_input_from_connector_config_specification={\n app_id: {\n type: string\n path_in_connector_config: ['app_id']\n }\n }\nif connector values such as 'info.app_id' nested inside another object are used to generate the API url for the oauth flow,\n oauth_user_input_from_connector_config_specification={\n app_id: {\n type: string\n path_in_connector_config: ['info', 'app_id']\n }\n }", - examples=[ - {"app_id": {"type": "string", "path_in_connector_config": ["app_id"]}}, - { - "app_id": { - "type": "string", - "path_in_connector_config": ["info", "app_id"], - } - }, - ], - title="OAuth user input", - ) + oauth_user_input_from_connector_config_specification: Optional[Dict[str, Any]] = Field( + None, + description="OAuth specific blob. This is a Json Schema used to validate Json configurations used as input to OAuth.\nMust be a valid non-nested JSON that refers to properties from ConnectorSpecification.connectionSpecification\nusing special annotation 'path_in_connector_config'.\nThese are input values the user is entering through the UI to authenticate to the connector, that might also shared\nas inputs for syncing data via the connector.\nExamples:\nif no connector values is shared during oauth flow, oauth_user_input_from_connector_config_specification=[]\nif connector values such as 'app_id' inside the top level are used to generate the API url for the oauth flow,\n oauth_user_input_from_connector_config_specification={\n app_id: {\n type: string\n path_in_connector_config: ['app_id']\n }\n }\nif connector values such as 'info.app_id' nested inside another object are used to generate the API url for the oauth flow,\n oauth_user_input_from_connector_config_specification={\n app_id: {\n type: string\n path_in_connector_config: ['info', 'app_id']\n }\n }", + examples=[ + {"app_id": {"type": "string", "path_in_connector_config": ["app_id"]}}, + { + "app_id": { + "type": "string", + "path_in_connector_config": ["info", "app_id"], + } + }, + ], + title="OAuth user input", ) - oauth_connector_input_specification: Optional[OauthConnectorInputSpecification] = ( - Field( - None, - description='The DeclarativeOAuth specific blob.\nPertains to the fields defined by the connector relating to the OAuth flow.\n\nInterpolation capabilities:\n- The variables placeholders are declared as `{{my_var}}`.\n- The nested resolution variables like `{{ {{my_nested_var}} }}` is allowed as well.\n\n- The allowed interpolation context is:\n + base64Encoder - encode to `base64`, {{ {{my_var_a}}:{{my_var_b}} | base64Encoder }}\n + base64Decorer - decode from `base64` encoded string, {{ {{my_string_variable_or_string_value}} | base64Decoder }}\n + urlEncoder - encode the input string to URL-like format, {{ https://test.host.com/endpoint | urlEncoder}}\n + urlDecorer - decode the input url-encoded string into text format, {{ urlDecoder:https%3A%2F%2Fairbyte.io | urlDecoder}}\n + codeChallengeS256 - get the `codeChallenge` encoded value to provide additional data-provider specific authorisation values, {{ {{state_value}} | codeChallengeS256 }}\n\nExamples:\n - The TikTok Marketing DeclarativeOAuth spec:\n {\n "oauth_connector_input_specification": {\n "type": "object",\n "additionalProperties": false,\n "properties": {\n "consent_url": "https://ads.tiktok.com/marketing_api/auth?{{client_id_key}}={{client_id_value}}&{{redirect_uri_key}}={{ {{redirect_uri_value}} | urlEncoder}}&{{state_key}}={{state_value}}",\n "access_token_url": "https://business-api.tiktok.com/open_api/v1.3/oauth2/access_token/",\n "access_token_params": {\n "{{ auth_code_key }}": "{{ auth_code_value }}",\n "{{ client_id_key }}": "{{ client_id_value }}",\n "{{ client_secret_key }}": "{{ client_secret_value }}"\n },\n "access_token_headers": {\n "Content-Type": "application/json",\n "Accept": "application/json"\n },\n "extract_output": ["data.access_token"],\n "client_id_key": "app_id",\n "client_secret_key": "secret",\n "auth_code_key": "auth_code"\n }\n }\n }', - title="DeclarativeOAuth Connector Specification", - ) + oauth_connector_input_specification: Optional[OauthConnectorInputSpecification] = Field( + None, + description='The DeclarativeOAuth specific blob.\nPertains to the fields defined by the connector relating to the OAuth flow.\n\nInterpolation capabilities:\n- The variables placeholders are declared as `{{my_var}}`.\n- The nested resolution variables like `{{ {{my_nested_var}} }}` is allowed as well.\n\n- The allowed interpolation context is:\n + base64Encoder - encode to `base64`, {{ {{my_var_a}}:{{my_var_b}} | base64Encoder }}\n + base64Decorer - decode from `base64` encoded string, {{ {{my_string_variable_or_string_value}} | base64Decoder }}\n + urlEncoder - encode the input string to URL-like format, {{ https://test.host.com/endpoint | urlEncoder}}\n + urlDecorer - decode the input url-encoded string into text format, {{ urlDecoder:https%3A%2F%2Fairbyte.io | urlDecoder}}\n + codeChallengeS256 - get the `codeChallenge` encoded value to provide additional data-provider specific authorisation values, {{ {{state_value}} | codeChallengeS256 }}\n\nExamples:\n - The TikTok Marketing DeclarativeOAuth spec:\n {\n "oauth_connector_input_specification": {\n "type": "object",\n "additionalProperties": false,\n "properties": {\n "consent_url": "https://ads.tiktok.com/marketing_api/auth?{{client_id_key}}={{client_id_value}}&{{redirect_uri_key}}={{ {{redirect_uri_value}} | urlEncoder}}&{{state_key}}={{state_value}}",\n "access_token_url": "https://business-api.tiktok.com/open_api/v1.3/oauth2/access_token/",\n "access_token_params": {\n "{{ auth_code_key }}": "{{ auth_code_value }}",\n "{{ client_id_key }}": "{{ client_id_value }}",\n "{{ client_secret_key }}": "{{ client_secret_value }}"\n },\n "access_token_headers": {\n "Content-Type": "application/json",\n "Accept": "application/json"\n },\n "extract_output": ["data.access_token"],\n "client_id_key": "app_id",\n "client_secret_key": "secret",\n "auth_code_key": "auth_code"\n }\n }\n }', + title="DeclarativeOAuth Connector Specification", ) complete_oauth_output_specification: Optional[Dict[str, Any]] = Field( None, @@ -1043,9 +1035,7 @@ class Config: complete_oauth_server_input_specification: Optional[Dict[str, Any]] = Field( None, description="OAuth specific blob. This is a Json Schema used to validate Json configurations persisted as Airbyte Server configurations.\nMust be a valid non-nested JSON describing additional fields configured by the Airbyte Instance or Workspace Admins to be used by the\nserver when completing an OAuth flow (typically exchanging an auth code for refresh token).\nExamples:\n complete_oauth_server_input_specification={\n client_id: {\n type: string\n },\n client_secret: {\n type: string\n }\n }", - examples=[ - {"client_id": {"type": "string"}, "client_secret": {"type": "string"}} - ], + examples=[{"client_id": {"type": "string"}, "client_secret": {"type": "string"}}], title="OAuth input specification", ) complete_oauth_server_output_specification: Optional[Dict[str, Any]] = Field( @@ -1557,9 +1547,7 @@ class CustomConfigTransformation(BaseModel): class_name: str = Field( ..., description="Fully-qualified name of the class that will be implementing the custom config transformation. The format is `source_..`.", - examples=[ - "source_declarative_manifest.components.MyCustomConfigTransformation" - ], + examples=["source_declarative_manifest.components.MyCustomConfigTransformation"], ) parameters: Optional[Dict[str, Any]] = Field( None, @@ -1987,9 +1975,7 @@ class OAuthAuthenticator(BaseModel): scopes: Optional[List[str]] = Field( None, description="List of scopes that should be granted to the access token.", - examples=[ - ["crm.list.read", "crm.objects.contacts.read", "crm.schema.contacts.read"] - ], + examples=[["crm.list.read", "crm.objects.contacts.read", "crm.schema.contacts.read"]], title="Scopes", ) token_expiry_date: Optional[str] = Field( @@ -2237,9 +2223,7 @@ class RecordSelector(BaseModel): description="Responsible for filtering records to be emitted by the Source.", title="Record Filter", ) - schema_normalization: Optional[ - Union[SchemaNormalization, CustomSchemaNormalization] - ] = Field( + schema_normalization: Optional[Union[SchemaNormalization, CustomSchemaNormalization]] = Field( None, description="Responsible for normalization according to the schema.", title="Schema Normalization", @@ -2281,12 +2265,10 @@ class DpathValidator(BaseModel): ], title="Field Path", ) - validation_strategy: Union[ValidateAdheresToSchema, CustomValidationStrategy] = ( - Field( - ..., - description="The condition that the specified config value will be evaluated against", - title="Validation Strategy", - ) + validation_strategy: Union[ValidateAdheresToSchema, CustomValidationStrategy] = Field( + ..., + description="The condition that the specified config value will be evaluated against", + title="Validation Strategy", ) @@ -2303,12 +2285,10 @@ class PredicateValidator(BaseModel): ], title="Value", ) - validation_strategy: Union[ValidateAdheresToSchema, CustomValidationStrategy] = ( - Field( - ..., - description="The validation strategy to apply to the value.", - title="Validation Strategy", - ) + validation_strategy: Union[ValidateAdheresToSchema, CustomValidationStrategy] = Field( + ..., + description="The validation strategy to apply to the value.", + title="Validation Strategy", ) @@ -2333,12 +2313,12 @@ class ConfigAddFields(BaseModel): class CompositeErrorHandler(BaseModel): type: Literal["CompositeErrorHandler"] - error_handlers: List[ - Union[CompositeErrorHandler, DefaultErrorHandler, CustomErrorHandler] - ] = Field( - ..., - description="List of error handlers to iterate on to determine how to handle a failed response.", - title="Error Handlers", + error_handlers: List[Union[CompositeErrorHandler, DefaultErrorHandler, CustomErrorHandler]] = ( + Field( + ..., + description="List of error handlers to iterate on to determine how to handle a failed response.", + title="Error Handlers", + ) ) parameters: Optional[Dict[str, Any]] = Field(None, alias="$parameters") @@ -2381,9 +2361,7 @@ class Config: extra = Extra.allow type: Literal["ZipfileDecoder"] - decoder: Union[ - CsvDecoder, GzipDecoder, JsonDecoder, JsonItemsDecoder, JsonlDecoder - ] = Field( + decoder: Union[CsvDecoder, GzipDecoder, JsonDecoder, JsonItemsDecoder, JsonlDecoder] = Field( ..., description="Parser to parse the decompressed data from the zipfile(s).", title="Parser", @@ -2507,9 +2485,9 @@ class Config: type: Literal["DeclarativeSource"] check: Union[CheckStream, CheckDynamicStream] - streams: Optional[ - List[Union[ConditionalStreams, DeclarativeStream, StateDelegatingStream]] - ] = None + streams: Optional[List[Union[ConditionalStreams, DeclarativeStream, StateDelegatingStream]]] = ( + None + ) dynamic_streams: List[DynamicDeclarativeStream] version: str = Field( ..., @@ -2639,20 +2617,16 @@ class Config: extra = Extra.allow type: Literal["DeclarativeStream"] - name: Optional[str] = Field( - "", description="The stream name.", example=["Users"], title="Name" - ) + name: Optional[str] = Field("", description="The stream name.", example=["Users"], title="Name") retriever: Union[SimpleRetriever, AsyncRetriever, CustomRetriever] = Field( ..., description="Component used to coordinate how records are extracted across stream slices and request pages.", title="Retriever", ) - incremental_sync: Optional[Union[DatetimeBasedCursor, IncrementingCountCursor]] = ( - Field( - None, - description="Component used to fetch data incrementally based on a time field in the data.", - title="Incremental Sync", - ) + incremental_sync: Optional[Union[DatetimeBasedCursor, IncrementingCountCursor]] = Field( + None, + description="Component used to fetch data incrementally based on a time field in the data.", + title="Incremental Sync", ) primary_key: Optional[PrimaryKey] = Field("", title="Primary Key") schema_loader: Optional[ @@ -2826,20 +2800,18 @@ class HttpRequester(BaseModelWithDeprecations): description="For APIs that require explicit specification of the properties to query for, this component will take a static or dynamic set of properties (which can be optionally split into chunks) and allow them to be injected into an outbound request by accessing stream_partition.extra_fields.", title="Query Properties", ) - request_parameters: Optional[Union[Dict[str, Union[str, QueryProperties]], str]] = ( - Field( - None, - description="Specifies the query parameters that should be set on an outgoing HTTP request given the inputs.", - examples=[ - {"unit": "day"}, - { - "query": 'last_event_time BETWEEN TIMESTAMP "{{ stream_interval.start_time }}" AND TIMESTAMP "{{ stream_interval.end_time }}"' - }, - {"searchIn": "{{ ','.join(config.get('search_in', [])) }}"}, - {"sort_by[asc]": "updated_at"}, - ], - title="Query Parameters", - ) + request_parameters: Optional[Union[Dict[str, Union[str, QueryProperties]], str]] = Field( + None, + description="Specifies the query parameters that should be set on an outgoing HTTP request given the inputs.", + examples=[ + {"unit": "day"}, + { + "query": 'last_event_time BETWEEN TIMESTAMP "{{ stream_interval.start_time }}" AND TIMESTAMP "{{ stream_interval.end_time }}"' + }, + {"searchIn": "{{ ','.join(config.get('search_in', [])) }}"}, + {"sort_by[asc]": "updated_at"}, + ], + title="Query Parameters", ) request_headers: Optional[Union[Dict[str, str], str]] = Field( None, @@ -3011,9 +2983,7 @@ class QueryProperties(BaseModel): class StateDelegatingStream(BaseModel): type: Literal["StateDelegatingStream"] - name: str = Field( - ..., description="The stream name.", example=["Users"], title="Name" - ) + name: str = Field(..., description="The stream name.", example=["Users"], title="Name") full_refresh_stream: DeclarativeStream = Field( ..., description="Component used to coordinate how records are extracted across stream slices and request pages when the state is empty or not provided.", @@ -3107,17 +3077,13 @@ class AsyncRetriever(BaseModel): status_extractor: Union[DpathExtractor, CustomRecordExtractor] = Field( ..., description="Responsible for fetching the actual status of the async job." ) - download_target_extractor: Optional[ - Union[DpathExtractor, CustomRecordExtractor] - ] = Field( + download_target_extractor: Optional[Union[DpathExtractor, CustomRecordExtractor]] = Field( None, description="Responsible for fetching the final result `urls` provided by the completed / finished / ready async job.", ) download_extractor: Optional[ Union[DpathExtractor, CustomRecordExtractor, ResponseToFileExtractor] - ] = Field( - None, description="Responsible for fetching the records from provided urls." - ) + ] = Field(None, description="Responsible for fetching the records from provided urls.") creation_requester: Union[HttpRequester, CustomRequester] = Field( ..., description="Requester component that describes how to prepare HTTP requests to send to the source API to create the async server-side job.", From fe89f602329e3030b7bf4434dcc31194f67eab3c Mon Sep 17 00:00:00 2001 From: Devin AI <158243242+devin-ai-integration[bot]@users.noreply.github.com> Date: Thu, 14 May 2026 20:41:32 +0000 Subject: [PATCH 4/4] fix: restore models/declarative_component_schema.py to main + JsonItemsDecoder The earlier regeneration via `poe assemble` produced datamodel-code-generator drift unrelated to this PR (Optional[conint(ge=1)] instead of Optional[int] + ge=1 kwarg, removed ScopesJoinStrategy, reordered classes, whitespace in descriptions). That drift broke mypy on Python 3.13. Reset the generated file to match main and add only the new `JsonItemsDecoder` Pydantic class manually, mirroring the style of `JsonDecoder` / `JsonlDecoder`. --- .../models/declarative_component_schema.py | 203 +++++++++--------- 1 file changed, 100 insertions(+), 103 deletions(-) diff --git a/airbyte_cdk/sources/declarative/models/declarative_component_schema.py b/airbyte_cdk/sources/declarative/models/declarative_component_schema.py index 2d4fa79f1..2a7e9fabb 100644 --- a/airbyte_cdk/sources/declarative/models/declarative_component_schema.py +++ b/airbyte_cdk/sources/declarative/models/declarative_component_schema.py @@ -6,7 +6,7 @@ from enum import Enum from typing import Any, Dict, List, Literal, Optional, Union -from pydantic.v1 import BaseModel, Extra, Field, conint +from pydantic.v1 import BaseModel, Extra, Field from airbyte_cdk.sources.declarative.models.base_model_with_deprecations import ( BaseModelWithDeprecations, @@ -18,6 +18,12 @@ class AuthFlowType(Enum): oauth1_0 = "oauth1.0" +class ScopesJoinStrategy(Enum): + space = "space" + comma = "comma" + plus = "plus" + + class BasicHttpAuthenticator(BaseModel): type: Literal["BasicHttpAuthenticator"] username: str = Field( @@ -51,9 +57,10 @@ class DynamicStreamCheckConfig(BaseModel): dynamic_stream_name: str = Field( ..., description="The dynamic stream name.", title="Dynamic Stream Name" ) - stream_count: Optional[conint(ge=1)] = Field( + stream_count: Optional[int] = Field( None, description="The number of streams to attempt reading from during a check operation. If unset, all generated streams are checked. Must be a positive integer; if it exceeds the total number of available streams, all streams are checked.", + ge=1, title="Stream Count", ) @@ -482,7 +489,7 @@ class Config: ) weight: Optional[Union[int, str]] = Field( None, - description="The weight of a request matching this matcher when acquiring a call from the rate limiter. Different endpoints can consume different amounts from a shared budget by specifying different weights. If not set, each request counts as 1.\n", + description="The weight of a request matching this matcher when acquiring a call from the rate limiter. Different endpoints can consume different amounts from a shared budget by specifying different weights. If not set, each request counts as 1.", title="Weight", ) @@ -497,32 +504,6 @@ class OnNoRecords(Enum): emit_parent = "emit_parent" -class RecordExpander(BaseModel): - type: Literal["RecordExpander"] - expand_records_from_field: List[str] = Field( - ..., - description="Path to a nested array field within each record. Items from this array will be extracted and emitted as separate records. Supports wildcards (*) for matching multiple arrays.", - examples=[ - ["lines", "data"], - ["items"], - ["nested", "array"], - ["sections", "*", "items"], - ], - title="Expand Records From Field", - ) - remain_original_record: Optional[bool] = Field( - False, - description='If true, each expanded record will include the original parent record in an "original_record" field. Defaults to false.', - title="Remain Original Record", - ) - on_no_records: Optional[OnNoRecords] = Field( - OnNoRecords.skip, - description='Behavior when the expansion path is missing, not a list, or an empty list. "skip" (default) emits nothing. "emit_parent" emits the original parent record unchanged.', - title="On No Records", - ) - parameters: Optional[Dict[str, Any]] = Field(None, alias="$parameters") - - class ExponentialBackoffStrategy(BaseModel): type: Literal["ExponentialBackoffStrategy"] factor: Optional[Union[float, str]] = Field( @@ -852,32 +833,22 @@ class NoPagination(BaseModel): type: Literal["NoPagination"] -class Scope(BaseModel): - class Config: - extra = Extra.allow - - scope: str = Field(..., description="The OAuth scope string to request from the provider.") - - -class OptionalScope(BaseModel): +class State(BaseModel): class Config: extra = Extra.allow - scope: str = Field(..., description="The OAuth scope string to request from the provider.") - - -class ScopesJoinStrategy(Enum): - space = "space" - comma = "comma" - plus = "plus" + min: int + max: int -class State(BaseModel): +class OAuthScope(BaseModel): class Config: extra = Extra.allow - min: int - max: int + scope: str = Field( + ..., + description="The OAuth scope string to request from the provider.", + ) class OauthConnectorInputSpecification(BaseModel): @@ -899,13 +870,17 @@ class Config: examples=["user:read user:read_orders workspaces:read"], title="Scopes", ) - scopes: Optional[List[Scope]] = Field( + # NOTE: scopes, optional_scopes, and scopes_join_strategy are processed by the + # platform OAuth handler (DeclarativeOAuthSpecHandler.kt), not by the CDK runtime. + # The CDK schema defines the manifest contract; the platform reads these fields + # during the OAuth consent flow to build the authorization URL. + scopes: Optional[List[OAuthScope]] = Field( None, description="List of OAuth scope objects. When present, takes precedence over the `scope` string property.\nThe scope values are joined using the `scopes_join_strategy` (default: space) before being\nsent to the OAuth provider.", examples=[[{"scope": "user:read"}, {"scope": "user:write"}]], title="Scopes", ) - optional_scopes: Optional[List[OptionalScope]] = Field( + optional_scopes: Optional[List[OAuthScope]] = Field( None, description="Optional OAuth scope objects that may or may not be granted.", examples=[[{"scope": "admin:read"}]], @@ -1278,14 +1253,7 @@ class AsyncJobStatusMap(BaseModel): completed: List[str] failed: List[str] timeout: List[str] - skipped: Optional[List[str]] = Field( - None, - description="Statuses that indicate the job was skipped because there is no data to return. Jobs with these statuses will not be retried and no records will be fetched.", - ) - - -class BlockSimultaneousSyncsAction(BaseModel): - type: Literal["BlockSimultaneousSyncsAction"] + skipped: Optional[List[str]] = None class ValueType(Enum): @@ -2126,23 +2094,28 @@ class DefaultPaginator(BaseModel): parameters: Optional[Dict[str, Any]] = Field(None, alias="$parameters") -class DpathExtractor(BaseModel): - type: Literal["DpathExtractor"] - field_path: List[str] = Field( +class RecordExpander(BaseModel): + type: Literal["RecordExpander"] + expand_records_from_field: List[str] = Field( ..., - description='List of potentially nested fields describing the full path of the field to extract. Use "*" to extract all values from an array. See more info in the [docs](https://docs.airbyte.com/connector-development/config-based/understanding-the-yaml-file/record-selector).', + description="Path to a nested array field within each record. Items from this array will be extracted and emitted as separate records. Supports wildcards (*) for matching multiple arrays.", examples=[ - ["data"], - ["data", "records"], - ["data", "{{ parameters.name }}"], - ["data", "*", "record"], + ["lines", "data"], + ["items"], + ["nested", "array"], + ["sections", "*", "items"], ], - title="Field Path", + title="Expand Records From Field", ) - record_expander: Optional[RecordExpander] = Field( - None, - description="Optional component to expand records by extracting items from nested array fields.", - title="Record Expander", + remain_original_record: Optional[bool] = Field( + False, + description='If true, each expanded record will include the original parent record in an "original_record" field. Defaults to false.', + title="Remain Original Record", + ) + on_no_records: Optional[OnNoRecords] = Field( + OnNoRecords.skip, + description='Behavior when the expansion path is missing, not a list, or an empty list. "skip" (default) emits nothing. "emit_parent" emits the original parent record unchanged.', + title="On No Records", ) parameters: Optional[Dict[str, Any]] = Field(None, alias="$parameters") @@ -2215,27 +2188,6 @@ class ListPartitionRouter(BaseModel): parameters: Optional[Dict[str, Any]] = Field(None, alias="$parameters") -class RecordSelector(BaseModel): - type: Literal["RecordSelector"] - extractor: Union[DpathExtractor, CustomRecordExtractor] - record_filter: Optional[Union[RecordFilter, CustomRecordFilter]] = Field( - None, - description="Responsible for filtering records to be emitted by the Source.", - title="Record Filter", - ) - schema_normalization: Optional[Union[SchemaNormalization, CustomSchemaNormalization]] = Field( - None, - description="Responsible for normalization according to the schema.", - title="Schema Normalization", - ) - transform_before_filtering: Optional[bool] = Field( - None, - description="If true, transformation will be applied before record filtering.", - title="Transform Before Filtering", - ) - parameters: Optional[Dict[str, Any]] = Field(None, alias="$parameters") - - class PaginationReset(BaseModel): type: Literal["PaginationReset"] action: Action1 @@ -2244,7 +2196,7 @@ class PaginationReset(BaseModel): class GzipDecoder(BaseModel): type: Literal["GzipDecoder"] - decoder: Union[CsvDecoder, GzipDecoder, JsonDecoder, JsonItemsDecoder, JsonlDecoder] + decoder: Union[CsvDecoder, GzipDecoder, JsonDecoder, JsonlDecoder] class RequestBodyGraphQL(BaseModel): @@ -2356,18 +2308,60 @@ class Config: ) +class DpathExtractor(BaseModel): + type: Literal["DpathExtractor"] + field_path: List[str] = Field( + ..., + description='List of potentially nested fields describing the full path of the field to extract. Use "*" to extract all values from an array. See more info in the [docs](https://docs.airbyte.com/connector-development/config-based/understanding-the-yaml-file/record-selector).', + examples=[ + ["data"], + ["data", "records"], + ["data", "{{ parameters.name }}"], + ["data", "*", "record"], + ], + title="Field Path", + ) + record_expander: Optional[RecordExpander] = Field( + None, + description="Optional component to expand records by extracting items from nested array fields.", + title="Record Expander", + ) + parameters: Optional[Dict[str, Any]] = Field(None, alias="$parameters") + + class ZipfileDecoder(BaseModel): class Config: extra = Extra.allow type: Literal["ZipfileDecoder"] - decoder: Union[CsvDecoder, GzipDecoder, JsonDecoder, JsonItemsDecoder, JsonlDecoder] = Field( + decoder: Union[CsvDecoder, GzipDecoder, JsonDecoder, JsonlDecoder] = Field( ..., description="Parser to parse the decompressed data from the zipfile(s).", title="Parser", ) +class RecordSelector(BaseModel): + type: Literal["RecordSelector"] + extractor: Union[DpathExtractor, CustomRecordExtractor] + record_filter: Optional[Union[RecordFilter, CustomRecordFilter]] = Field( + None, + description="Responsible for filtering records to be emitted by the Source.", + title="Record Filter", + ) + schema_normalization: Optional[Union[SchemaNormalization, CustomSchemaNormalization]] = Field( + None, + description="Responsible for normalization according to the schema.", + title="Schema Normalization", + ) + transform_before_filtering: Optional[bool] = Field( + None, + description="If true, transformation will be applied before record filtering.", + title="Transform Before Filtering", + ) + parameters: Optional[Dict[str, Any]] = Field(None, alias="$parameters") + + class ConfigMigration(BaseModel): type: Literal["ConfigMigration"] description: Optional[str] = Field( @@ -2460,7 +2454,7 @@ class Config: api_budget: Optional[HTTPAPIBudget] = None stream_groups: Optional[Dict[str, StreamGroup]] = Field( None, - description="Groups of streams that share a common resource and should not be read simultaneously. Each group defines a set of stream references and an action that controls how concurrent reads are managed. Only applies to ConcurrentDeclarativeSource.\n", + description="Groups of streams that share a common resource and should not be read simultaneously. Each group defines a set of stream references and an action that controls how concurrent reads are managed. Only applies to ConcurrentDeclarativeSource.", title="Stream Groups", ) max_concurrent_async_job_count: Optional[Union[int, str]] = Field( @@ -2500,7 +2494,7 @@ class Config: api_budget: Optional[HTTPAPIBudget] = None stream_groups: Optional[Dict[str, StreamGroup]] = Field( None, - description="Groups of streams that share a common resource and should not be read simultaneously. Each group defines a set of stream references and an action that controls how concurrent reads are managed. Only applies to ConcurrentDeclarativeSource.\n", + description="Groups of streams that share a common resource and should not be read simultaneously. Each group defines a set of stream references and an action that controls how concurrent reads are managed. Only applies to ConcurrentDeclarativeSource.", title="Stream Groups", ) max_concurrent_async_job_count: Optional[Union[int, str]] = Field( @@ -2996,7 +2990,7 @@ class StateDelegatingStream(BaseModel): ) api_retention_period: Optional[str] = Field( None, - description="The data retention period of the incremental API (ISO8601 duration). If the cursor value is older than this retention period, the connector will automatically fall back to a full refresh to avoid data loss.\nThis is useful for APIs like Stripe Events API which only retain data for 30 days.\n* **PT1H**: 1 hour\n* **P1D**: 1 day\n* **P1W**: 1 week\n* **P1M**: 1 month\n* **P1Y**: 1 year\n* **P30D**: 30 days\n", + description="The data retention period of the incremental API (ISO8601 duration). If the cursor value is older than this retention period, the connector will automatically fall back to a full refresh to avoid data loss.\nThis is useful for APIs like Stripe Events API which only retain data for 30 days.\n * **PT1H**: 1 hour\n * **P1D**: 1 day\n * **P1W**: 1 week\n * **P1M**: 1 month\n * **P1Y**: 1 year\n * **P30D**: 30 days\n", examples=["P30D", "P90D", "P1Y"], title="API Retention Period", ) @@ -3012,7 +3006,6 @@ class SimpleRetriever(BaseModel): decoder: Optional[ Union[ JsonDecoder, - JsonItemsDecoder, XmlDecoder, CsvDecoder, JsonlDecoder, @@ -3096,7 +3089,7 @@ class AsyncRetriever(BaseModel): None, description="The time in minutes after which the single Async Job should be considered as Timed Out.", ) - failed_retry_wait_time_in_seconds: Optional[Union[conint(ge=1), str]] = Field( + failed_retry_wait_time_in_seconds: Optional[Union[int, str]] = Field( None, description="Time in seconds to wait before retrying a failed async job. Only applies to jobs that ran on the API side and reported a FAILED status (e.g. report generation failed due to a cooldown). Creation failures (HTTP errors when starting a job, such as 429s) and TIMED_OUT jobs are retried immediately and are not affected by this setting. When set, the orchestrator defers retry of real failed jobs until the wait time has elapsed, without blocking other jobs.", ) @@ -3145,7 +3138,6 @@ class AsyncRetriever(BaseModel): CsvDecoder, GzipDecoder, JsonDecoder, - JsonItemsDecoder, JsonlDecoder, IterableDecoder, XmlDecoder, @@ -3162,7 +3154,6 @@ class AsyncRetriever(BaseModel): CsvDecoder, GzipDecoder, JsonDecoder, - JsonItemsDecoder, JsonlDecoder, IterableDecoder, XmlDecoder, @@ -3177,14 +3168,20 @@ class AsyncRetriever(BaseModel): parameters: Optional[Dict[str, Any]] = Field(None, alias="$parameters") +class BlockSimultaneousSyncsAction(BaseModel): + type: Literal["BlockSimultaneousSyncsAction"] + + class StreamGroup(BaseModel): - streams: List[DeclarativeStream] = Field( + streams: List[str] = Field( ..., - description="List of references to streams that belong to this group.\n", + description='List of references to streams that belong to this group. Use JSON references to stream definitions (e.g., "#/definitions/my_stream").', title="Streams", ) action: BlockSimultaneousSyncsAction = Field( - ..., description="The action to apply to streams in this group.", title="Action" + ..., + description="The action to apply to streams in this group.", + title="Action", )