From 65a83c506691e8ead12ed2f9e9cece719a4dca17 Mon Sep 17 00:00:00 2001 From: Michael Gruschke Date: Mon, 6 Apr 2026 23:44:04 +0200 Subject: [PATCH 01/18] feat(ml): add qdrant ingestion refactor: use local qdrant implementation for tests chore: clean up imports chore: add qdrant dependency to ml_test extra chore: run precommit chore: add comment to CHANGES.md fix: guard against import error fix: import --- CHANGES.md | 1 + .../apache_beam/ml/rag/ingestion/qdrant.py | 163 ++++++++++ .../ml/rag/ingestion/qdrant_it_test.py | 279 ++++++++++++++++++ sdks/python/setup.py | 10 +- 4 files changed, 449 insertions(+), 4 deletions(-) create mode 100644 sdks/python/apache_beam/ml/rag/ingestion/qdrant.py create mode 100644 sdks/python/apache_beam/ml/rag/ingestion/qdrant_it_test.py diff --git a/CHANGES.md b/CHANGES.md index 74209bb7499c..db2015319fc5 100644 --- a/CHANGES.md +++ b/CHANGES.md @@ -153,6 +153,7 @@ * Updates minimum Go version to 1.26.1 ([#37897](https://github.com/apache/beam/issues/37897)). * (Python) Added image embedding support in `apache_beam.ml.rag` package ([#37628](https://github.com/apache/beam/issues/37628)). * (Python) Added support for Python version 3.14 ([#37247](https://github.com/apache/beam/issues/37247)). +* (Python) Added [Qdrant](https://qdrant.tech/) VectorDatabaseWriteConfig implementation ([#38141](https://github.com/apache/beam/issues/38141)). ## Breaking Changes diff --git a/sdks/python/apache_beam/ml/rag/ingestion/qdrant.py b/sdks/python/apache_beam/ml/rag/ingestion/qdrant.py new file mode 100644 index 000000000000..636ef139b2c1 --- /dev/null +++ b/sdks/python/apache_beam/ml/rag/ingestion/qdrant.py @@ -0,0 +1,163 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import logging +from dataclasses import dataclass, field +from typing import Any, Callable, Dict, Optional + +try: + from qdrant_client import QdrantClient, models +except ImportError: + logging.warning("Qdrant client library is not installed.") + +import apache_beam as beam +from apache_beam.ml.rag.ingestion.base import VectorDatabaseWriteConfig +from apache_beam.ml.rag.types import EmbeddableItem + +DEFAULT_WRITE_BATCH_SIZE = 1000 + + +@dataclass +class QdrantConnectionParameters: + location: Optional[str] = None + url: Optional[str] = None + port: Optional[int] = 6333 + grpc_port: int = 6334 + prefer_grpc: bool = False + https: Optional[bool] = None + api_key: Optional[str] = None + prefix: Optional[str] = None + timeout: Optional[int] = None + host: Optional[str] = None + path: Optional[str] = None + kwargs: Dict[str, Any] = field(default_factory=dict) + + def __post_init__(self): + if not (self.location or self.url or self.host or self.path): + raise ValueError( + "One of location, url, host, or path must be provided for Qdrant") + + +@dataclass +class QdrantWriteConfig(VectorDatabaseWriteConfig): + connection_params: QdrantConnectionParameters + collection_name: str + timeout: Optional[float] = None + batch_size: int = DEFAULT_WRITE_BATCH_SIZE + kwargs: Dict[str, Any] = field(default_factory=dict) + dense_embedding_key: str = "dense" + sparse_embedding_key: str = "sparse" + + def __post_init__(self): + if not self.collection_name: + raise ValueError("Collection name must be provided") + + def create_write_transform(self) -> beam.PTransform[EmbeddableItem, Any]: + return _QdrantWriteTransform(self) + + def create_converter( + self) -> Callable[[EmbeddableItem], 'models.PointStruct']: + def convert(item: EmbeddableItem) -> 'models.PointStruct': + if item.dense_embedding is None and item.sparse_embedding is None: + raise ValueError( + "EmbeddableItem must have at least one embedding (dense or sparse)") + vector = {} + if item.dense_embedding is not None: + vector[self.dense_embedding_key] = item.dense_embedding + if item.sparse_embedding is not None: + sparse_indices, sparse_values = item.sparse_embedding + vector[self.sparse_embedding_key] = models.SparseVector( + indices=sparse_indices, + values=sparse_values, + ) + id = ( + int(item.id) + if isinstance(item.id, str) and item.id.isdigit() else item.id) + return models.PointStruct( + id=id, + vector=vector, + payload=item.metadata if item.metadata else None, + ) + + return convert + + +class _QdrantWriteTransform(beam.PTransform): + def __init__(self, config: QdrantWriteConfig): + self.config = config + + def expand(self, input_or_inputs: beam.PCollection[EmbeddableItem]): + return ( + input_or_inputs + | "Convert to Records" >> beam.Map(self.config.create_converter()) + | beam.ParDo(_QdrantWriteFn(self.config))) + + +class _QdrantWriteFn(beam.DoFn): + def __init__(self, config: QdrantWriteConfig): + self.config = config + self._batch = [] + self._client: 'Optional[QdrantClient]' = None + + def process(self, element, *args, **kwargs): + self._batch.append(element) + if len(self._batch) >= self.config.batch_size: + self._flush() + + def setup(self): + params = self.config.connection_params + self._client = QdrantClient( + location=params.location, + url=params.url, + port=params.port, + grpc_port=params.grpc_port, + prefer_grpc=params.prefer_grpc, + https=params.https, + api_key=params.api_key, + prefix=params.prefix, + timeout=params.timeout, + host=params.host, + path=params.path, + check_compatibility=False, + **params.kwargs, + ) + + def teardown(self): + if self._client: + self._client.close() + self._client = None + + def finish_bundle(self): + self._flush() + + def _flush(self): + if len(self._batch) == 0: + return + if not self._client: + raise RuntimeError("Qdrant client is not initialized") + self._client.upsert( + collection_name=self.config.collection_name, + points=self._batch, + timeout=self.config.timeout, + **self.config.kwargs, + ) + self._batch = [] + + def display_data(self): + res = super().display_data() + res["collection"] = self.config.collection_name + res["batch_size"] = self.config.batch_size + return res diff --git a/sdks/python/apache_beam/ml/rag/ingestion/qdrant_it_test.py b/sdks/python/apache_beam/ml/rag/ingestion/qdrant_it_test.py new file mode 100644 index 000000000000..2e035e083308 --- /dev/null +++ b/sdks/python/apache_beam/ml/rag/ingestion/qdrant_it_test.py @@ -0,0 +1,279 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import contextlib +import tempfile +import unittest + +import apache_beam as beam +from apache_beam.ml.rag.ingestion.qdrant import QdrantConnectionParameters +from apache_beam.ml.rag.ingestion.qdrant import QdrantWriteConfig +from apache_beam.ml.rag.types import Content +from apache_beam.ml.rag.types import EmbeddableItem +from apache_beam.ml.rag.types import Embedding +from apache_beam.testing.test_pipeline import TestPipeline + +# pylint: disable=ungrouped-imports +try: + from qdrant_client import QdrantClient, models + QDRANT_AVAILABLE = True +except ImportError: + QDRANT_AVAILABLE = False +# pylint: enable=ungrouped-imports + +TEST_CORPUS = [ + EmbeddableItem( + id="1", + content=Content(text="Test document one"), + metadata={"source": "test1"}, + embedding=Embedding(dense_embedding=[1.0, 0.0]), + ), + EmbeddableItem( + id="2", + content=Content(text="Test document two"), + metadata={"source": "test2"}, + embedding=Embedding(dense_embedding=[0.0, 1.0]), + ), + EmbeddableItem( + id="3", + content=Content(text="Test document three"), + metadata={"source": "test3"}, + embedding=Embedding(dense_embedding=[-1.0, 0.0]), + ), +] + + +@unittest.skipIf(not QDRANT_AVAILABLE, "qdrant dependencies not installed.") +class TestQdrantIngestion(unittest.TestCase): + @contextlib.contextmanager + def qdrant_client(self) -> 'QdrantClient': + client = QdrantClient(path=self._temp_dir.name) + try: + yield client + finally: + client.close() + + def setUp(self): + self._temp_dir = tempfile.TemporaryDirectory() + self._collection_name = f"test_collection_{self._testMethodName}" + + with self.qdrant_client() as client: + client.create_collection( + collection_name=self._collection_name, + vectors_config={ + "dense": models.VectorParams( + size=2, distance=models.Distance.COSINE) + }, + sparse_vectors_config={"sparse": models.SparseVectorParams()}, + ) + assert client.collection_exists(collection_name=self._collection_name) + + self._connection_params = QdrantConnectionParameters( + path=self._temp_dir.name) + + def tearDown(self): + self._temp_dir.cleanup() + + def test_write_on_non_existent_collection(self): + non_existent = "nonexistent_collection" + write_config = QdrantWriteConfig( + connection_params=self._connection_params, + collection_name=non_existent, + batch_size=1, + ) + + with self.assertRaises(Exception): + with TestPipeline() as p: + _ = p | beam.Create(TEST_CORPUS) | write_config.create_write_transform() + + def test_write_dense_embeddings_only(self): + write_config = QdrantWriteConfig( + connection_params=self._connection_params, + collection_name=self._collection_name, + batch_size=len(TEST_CORPUS), + ) + + with TestPipeline() as p: + _ = p | beam.Create(TEST_CORPUS) | write_config.create_write_transform() + + with self.qdrant_client() as client: + count_result = client.count(collection_name=self._collection_name) + self.assertEqual(count_result.count, len(TEST_CORPUS)) + + points, _ = client.scroll( + collection_name=self._collection_name, + limit=100, + with_payload=True, + with_vectors=True, + ) + points_by_id = {p.id: p for p in points} + + for item in TEST_CORPUS: + expected_record = models.Record( + id=int(item.id), + vector={"dense": item.dense_embedding}, + payload=item.metadata, + ) + self.assertEqual(expected_record, points_by_id[int(item.id)]) + + def test_write_sparse_embeddings_only(self): + sparse_corpus = [ + EmbeddableItem( + id="1", + content=Content(text="Sparse doc one"), + metadata={"source": "sparse1"}, + embedding=Embedding(sparse_embedding=([0, 1, 2], [0.1, 0.2, 0.3])), + ), + EmbeddableItem( + id="2", + content=Content(text="Sparse doc two"), + metadata={"source": "sparse2"}, + embedding=Embedding(sparse_embedding=([1, 3, 5], [0.4, 0.5, 0.6])), + ), + ] + + write_config = QdrantWriteConfig( + connection_params=self._connection_params, + collection_name=self._collection_name, + batch_size=len(sparse_corpus), + ) + + with TestPipeline() as p: + _ = p | beam.Create(sparse_corpus) | write_config.create_write_transform() + + with self.qdrant_client() as client: + count_result = client.count(collection_name=self._collection_name) + self.assertEqual(count_result.count, len(sparse_corpus)) + + points, _ = client.scroll( + collection_name=self._collection_name, + limit=100, + with_payload=True, + with_vectors=True, + ) + points_by_id = {p.id: p for p in points} + + for item in sparse_corpus: + expected_record = models.Record( + id=int(item.id), + vector={ + "sparse": models.SparseVector( + indices=item.sparse_embedding[0], + values=item.sparse_embedding[1], + ) + }, + payload=item.metadata, + ) + self.assertEqual(expected_record, points_by_id[int(item.id)]) + + def test_write_both_dense_and_sparse(self): + hybrid_corpus = [ + EmbeddableItem( + id="1", + content=Content(text="Hybrid doc one"), + metadata={"source": "hybrid1"}, + embedding=Embedding( + dense_embedding=[1.0, 0.0], + sparse_embedding=([0, 1], [0.1, 0.2])), + ), + EmbeddableItem( + id="2", + content=Content(text="Hybrid doc two"), + metadata={"source": "hybrid2"}, + embedding=Embedding( + dense_embedding=[0.0, 1.0], + sparse_embedding=([2, 3], [0.3, 0.4])), + ), + ] + + write_config = QdrantWriteConfig( + connection_params=self._connection_params, + collection_name=self._collection_name, + batch_size=len(hybrid_corpus), + ) + + with TestPipeline() as p: + _ = p | beam.Create(hybrid_corpus) | write_config.create_write_transform() + + with self.qdrant_client() as client: + count_result = client.count(collection_name=self._collection_name) + self.assertEqual(count_result.count, len(hybrid_corpus)) + + points, _ = client.scroll( + collection_name=self._collection_name, + limit=100, + with_payload=True, + with_vectors=True, + ) + points_by_id = {p.id: p for p in points} + + for item in hybrid_corpus: + expected_record = models.Record( + id=int(item.id), + vector={ + "dense": item.dense_embedding, + "sparse": models.SparseVector( + indices=item.sparse_embedding[0], + values=item.sparse_embedding[1]), + }, + payload=item.metadata, + ) + self.assertEqual(expected_record, points_by_id[int(item.id)]) + + def test_write_with_batching(self): + batch_corpus = [ + EmbeddableItem( + id=str(i), + content=Content(text=f"Batch doc {i}"), + metadata={"batch_id": i}, + embedding=Embedding(dense_embedding=[1.0, 0.0]), + ) for i in range(1, 8) + ] + + write_config = QdrantWriteConfig( + connection_params=self._connection_params, + collection_name=self._collection_name, + batch_size=3, + ) + + with TestPipeline() as p: + _ = p | beam.Create(batch_corpus) | write_config.create_write_transform() + + with self.qdrant_client() as client: + count_result = client.count(collection_name=self._collection_name) + self.assertEqual(count_result.count, len(batch_corpus)) + + points, _ = client.scroll( + collection_name=self._collection_name, + limit=100, + with_payload=True, + with_vectors=True, + ) + points_by_id = {p.id: p for p in points} + + for item in batch_corpus: + expected_record = models.Record( + id=int(item.id), + vector={ + "dense": item.dense_embedding, + }, + payload=item.metadata, + ) + self.assertEqual(expected_record, points_by_id[int(item.id)]) + + +if __name__ == "__main__": + unittest.main() diff --git a/sdks/python/setup.py b/sdks/python/setup.py index 4c1384c31517..f68223ff4ab9 100644 --- a/sdks/python/setup.py +++ b/sdks/python/setup.py @@ -166,6 +166,7 @@ def cythonize(*args, **kwargs): ] milvus_dependency = ['pymilvus>=2.5.10,<3.0.0'] +qdrant_dependency = ['qdrant-client>=1.15.0'] # google-adk / OpenTelemetry require protobuf>=5; tensorflow-transform in # ml_test is pinned to versions that require protobuf<5 on Python 3.10. Those @@ -607,14 +608,14 @@ def get_portability_package_data(): 'tf2onnx>=1.16.1,<1.17', ] + ml_base_core, 'p310_ml_test': [ - 'datatable', - ] + ml_base, + 'datatable', + ] + ml_base + qdrant_dependency, 'p312_ml_test': [ 'datatable', - ] + ml_base, + ] + ml_base + qdrant_dependency, # maintainer: milvus tests only run with this extension. Make sure it # is covered by docker-in-docker test when changing py version - 'p313_ml_test': ml_base + milvus_dependency, + 'p313_ml_test': ml_base + milvus_dependency + qdrant_dependency, 'aws': ['boto3>=1.9,<2'], 'azure': [ 'azure-storage-blob>=12.3.2,<13', @@ -685,6 +686,7 @@ def get_portability_package_data(): 'xgboost': ['xgboost>=1.6.0,<2.1.3', 'datatable==1.0.0'], 'tensorflow-hub': ['tensorflow-hub>=0.14.0,<0.16.0'], 'milvus': milvus_dependency, + 'qdrant': qdrant_dependency, 'vllm': ['openai==1.107.1', 'vllm==0.10.1.1', 'triton==3.3.1'] }, zip_safe=False, From d82df270ee3d9742d0b8d1951e83add32aa9deaf Mon Sep 17 00:00:00 2001 From: Michael Gruschke Date: Wed, 29 Apr 2026 21:22:29 +0200 Subject: [PATCH 02/18] fix: typing --- sdks/python/apache_beam/ml/rag/ingestion/qdrant.py | 7 ++++--- 1 file changed, 4 insertions(+), 3 deletions(-) diff --git a/sdks/python/apache_beam/ml/rag/ingestion/qdrant.py b/sdks/python/apache_beam/ml/rag/ingestion/qdrant.py index 636ef139b2c1..45eb48055bb5 100644 --- a/sdks/python/apache_beam/ml/rag/ingestion/qdrant.py +++ b/sdks/python/apache_beam/ml/rag/ingestion/qdrant.py @@ -15,8 +15,9 @@ # limitations under the License. import logging +from collections.abc import Callable from dataclasses import dataclass, field -from typing import Any, Callable, Dict, Optional +from typing import Any, Optional try: from qdrant_client import QdrantClient, models @@ -43,7 +44,7 @@ class QdrantConnectionParameters: timeout: Optional[int] = None host: Optional[str] = None path: Optional[str] = None - kwargs: Dict[str, Any] = field(default_factory=dict) + kwargs: dict[str, Any] = field(default_factory=dict) def __post_init__(self): if not (self.location or self.url or self.host or self.path): @@ -57,7 +58,7 @@ class QdrantWriteConfig(VectorDatabaseWriteConfig): collection_name: str timeout: Optional[float] = None batch_size: int = DEFAULT_WRITE_BATCH_SIZE - kwargs: Dict[str, Any] = field(default_factory=dict) + kwargs: dict[str, Any] = field(default_factory=dict) dense_embedding_key: str = "dense" sparse_embedding_key: str = "sparse" From 2fe85ee8d8e9a2dc7e941e28d9ac94bb0f402397 Mon Sep 17 00:00:00 2001 From: Michael Gruschke Date: Wed, 29 Apr 2026 22:05:17 +0200 Subject: [PATCH 03/18] chore: add docstring --- .../apache_beam/ml/rag/ingestion/qdrant.py | 54 +++++++++++++++++-- 1 file changed, 51 insertions(+), 3 deletions(-) diff --git a/sdks/python/apache_beam/ml/rag/ingestion/qdrant.py b/sdks/python/apache_beam/ml/rag/ingestion/qdrant.py index 45eb48055bb5..ff86cd07ef27 100644 --- a/sdks/python/apache_beam/ml/rag/ingestion/qdrant.py +++ b/sdks/python/apache_beam/ml/rag/ingestion/qdrant.py @@ -33,6 +33,38 @@ @dataclass class QdrantConnectionParameters: + """Configuration parameters for connecting to Qdrant service. + + Either `location`, `url`, `host`, or `path` must be provided to establish + a connection. + + Args: + location: + If `str` - use it as a `url` parameter. + If `None` - use default values for `host` and `port`. + url: either host or str of "//:/". + Default: `None` + port: Port of the REST API interface. Default: 6333 + grpc_port: Port of the gRPC interface. Default: 6334 + prefer_grpc: If `true` - use gPRC interface whenever possible. + https: If `true` - use HTTPS(SSL) protocol. Default: `None` + api_key: API key for authentication in Qdrant Cloud. Default: `None` + prefix: + If not `None` - add `prefix` to the REST URL path. + Example: `service/v1` will result in + `http://localhost:6333/service/v1/{qdrant-endpoint}` for REST API. + Default: `None` + timeout: + Timeout for REST and gRPC API requests. + Default: 5 seconds for REST and unlimited for gRPC + host: + Host name of Qdrant service. + If url and host are None, set to 'localhost'. + Default: `None` + path: Persistence path for QdrantLocal. Default: `None` + **kwargs: Additional arguments passed directly into client initialization + """ + location: Optional[str] = None url: Optional[str] = None port: Optional[int] = 6333 @@ -54,6 +86,21 @@ def __post_init__(self): @dataclass class QdrantWriteConfig(VectorDatabaseWriteConfig): + """Configuration for writing to Qdrant vector database. + + This class defines the parameters needed to write data to a qdrant collection, + including collection targeting, batching behavior, and operation timeouts. + + Args: + connection_params: QdrantConnectionParameters with connection settings. + collection_name: Name of the Qdrant collection to write to. + timeout: Optional timeout for write operations in seconds. Default is None. + batch_size: Number of points to write in each batch. Default is 1000. + kwargs: Additional keyword arguments to pass to the client's upsert method. + dense_embedding_key: name for the dense vector in the qdrant collection. + sparse_embedding_key: name for the sparse vector in the qdrant collection. + """ + connection_params: QdrantConnectionParameters collection_name: str timeout: Optional[float] = None @@ -70,8 +117,9 @@ def create_write_transform(self) -> beam.PTransform[EmbeddableItem, Any]: return _QdrantWriteTransform(self) def create_converter( - self) -> Callable[[EmbeddableItem], 'models.PointStruct']: - def convert(item: EmbeddableItem) -> 'models.PointStruct': + self, + ) -> Callable[[EmbeddableItem], "models.PointStruct"]: + def convert(item: EmbeddableItem) -> "models.PointStruct": if item.dense_embedding is None and item.sparse_embedding is None: raise ValueError( "EmbeddableItem must have at least one embedding (dense or sparse)") @@ -111,7 +159,7 @@ class _QdrantWriteFn(beam.DoFn): def __init__(self, config: QdrantWriteConfig): self.config = config self._batch = [] - self._client: 'Optional[QdrantClient]' = None + self._client: "Optional[QdrantClient]" = None def process(self, element, *args, **kwargs): self._batch.append(element) From 46a08c858b5982afa2194f49112134bb00b9f044 Mon Sep 17 00:00:00 2001 From: Michael Gruschke Date: Wed, 29 Apr 2026 22:08:58 +0200 Subject: [PATCH 04/18] chore: move change to 2.74.0 release notes --- CHANGES.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/CHANGES.md b/CHANGES.md index db2015319fc5..c1b6232d8dd5 100644 --- a/CHANGES.md +++ b/CHANGES.md @@ -116,6 +116,7 @@ * (Python) Supported Python user type in Beam SQL. For example, SQL statements like `SELECT some_field from PCOLLECTION` can now operate a PCollection of Beam Row containing pickable Python user type ([#20738](https://github.com/apache/beam/issues/20738)). * (Python) Introduced `beam.coders.registry.register_row` as preferred API to register a named tuple or dataclass with a Beam Row. At pipelne runtime, the original type associated with the registered row are preserved across the serialization boundary ([#38108](https://github.com/apache/beam/issues/38108)). * (Python) Added `type_overrides` parameter to `WriteToBigQuery` allowing users to specify custom BigQuery to Python type mappings when using Storage Write API. This enables support for types like DATE, DATETIME, and JSON (Python) ([#25946](https://github.com/apache/beam/issues/25946)). +* (Python) Added [Qdrant](https://qdrant.tech/) VectorDatabaseWriteConfig implementation ([#38141](https://github.com/apache/beam/issues/38141)). ## Breaking Changes @@ -153,7 +154,6 @@ * Updates minimum Go version to 1.26.1 ([#37897](https://github.com/apache/beam/issues/37897)). * (Python) Added image embedding support in `apache_beam.ml.rag` package ([#37628](https://github.com/apache/beam/issues/37628)). * (Python) Added support for Python version 3.14 ([#37247](https://github.com/apache/beam/issues/37247)). -* (Python) Added [Qdrant](https://qdrant.tech/) VectorDatabaseWriteConfig implementation ([#38141](https://github.com/apache/beam/issues/38141)). ## Breaking Changes From 06043c3a89797eab7793bef0a6d11689597a2abf Mon Sep 17 00:00:00 2001 From: Michael Gruschke Date: Sat, 9 May 2026 11:23:13 +0200 Subject: [PATCH 05/18] fix: move batch initialization for qdrant sink into start_bundle --- sdks/python/apache_beam/ml/rag/ingestion/qdrant.py | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/sdks/python/apache_beam/ml/rag/ingestion/qdrant.py b/sdks/python/apache_beam/ml/rag/ingestion/qdrant.py index ff86cd07ef27..601d9bf16209 100644 --- a/sdks/python/apache_beam/ml/rag/ingestion/qdrant.py +++ b/sdks/python/apache_beam/ml/rag/ingestion/qdrant.py @@ -158,9 +158,11 @@ def expand(self, input_or_inputs: beam.PCollection[EmbeddableItem]): class _QdrantWriteFn(beam.DoFn): def __init__(self, config: QdrantWriteConfig): self.config = config - self._batch = [] self._client: "Optional[QdrantClient]" = None + def start_bundle(self): + self._batch = [] + def process(self, element, *args, **kwargs): self._batch.append(element) if len(self._batch) >= self.config.batch_size: From db3ee8d70dee5662d0ffb79984f82dbe6c3f34c7 Mon Sep 17 00:00:00 2001 From: Michael Gruschke Date: Sat, 9 May 2026 12:58:44 +0200 Subject: [PATCH 06/18] feat: add byte size limit for qdrant ingestion --- .../apache_beam/ml/rag/ingestion/qdrant.py | 21 +++++++-- .../ml/rag/ingestion/qdrant_it_test.py | 45 +++++++++++++++++++ 2 files changed, 62 insertions(+), 4 deletions(-) diff --git a/sdks/python/apache_beam/ml/rag/ingestion/qdrant.py b/sdks/python/apache_beam/ml/rag/ingestion/qdrant.py index 601d9bf16209..d71ee289b549 100644 --- a/sdks/python/apache_beam/ml/rag/ingestion/qdrant.py +++ b/sdks/python/apache_beam/ml/rag/ingestion/qdrant.py @@ -19,6 +19,8 @@ from dataclasses import dataclass, field from typing import Any, Optional +from objsize import get_deep_size + try: from qdrant_client import QdrantClient, models except ImportError: @@ -29,6 +31,7 @@ from apache_beam.ml.rag.types import EmbeddableItem DEFAULT_WRITE_BATCH_SIZE = 1000 +DEFAULT_MAX_BATCH_BYTE_SIZE = 4 << 20 @dataclass @@ -103,8 +106,9 @@ class QdrantWriteConfig(VectorDatabaseWriteConfig): connection_params: QdrantConnectionParameters collection_name: str - timeout: Optional[float] = None + timeout: Optional[int] = None batch_size: int = DEFAULT_WRITE_BATCH_SIZE + max_batch_byte_size: int = DEFAULT_MAX_BATCH_BYTE_SIZE kwargs: dict[str, Any] = field(default_factory=dict) dense_embedding_key: str = "dense" sparse_embedding_key: str = "sparse" @@ -162,11 +166,18 @@ def __init__(self, config: QdrantWriteConfig): def start_bundle(self): self._batch = [] + self._batch_byte_size = 0 def process(self, element, *args, **kwargs): - self._batch.append(element) - if len(self._batch) >= self.config.batch_size: + element_byte_size = get_deep_size(element) + new_batch_byte_size = self._batch_byte_size + element_byte_size + + is_batch_full = len(self._batch) >= self.config.batch_size + is_batch_too_large = new_batch_byte_size > self.config.max_batch_byte_size + if (is_batch_full or is_batch_too_large): self._flush() + self._batch.append(element) + self._batch_byte_size += element_byte_size def setup(self): params = self.config.connection_params @@ -195,7 +206,7 @@ def finish_bundle(self): self._flush() def _flush(self): - if len(self._batch) == 0: + if not self._batch: return if not self._client: raise RuntimeError("Qdrant client is not initialized") @@ -206,9 +217,11 @@ def _flush(self): **self.config.kwargs, ) self._batch = [] + self._batch_byte_size = 0 def display_data(self): res = super().display_data() res["collection"] = self.config.collection_name res["batch_size"] = self.config.batch_size + res["max_batch_byte_size"] = self.config.max_batch_byte_size return res diff --git a/sdks/python/apache_beam/ml/rag/ingestion/qdrant_it_test.py b/sdks/python/apache_beam/ml/rag/ingestion/qdrant_it_test.py index 2e035e083308..069fe7e12331 100644 --- a/sdks/python/apache_beam/ml/rag/ingestion/qdrant_it_test.py +++ b/sdks/python/apache_beam/ml/rag/ingestion/qdrant_it_test.py @@ -274,6 +274,51 @@ def test_write_with_batching(self): ) self.assertEqual(expected_record, points_by_id[int(item.id)]) + def test_write_with_byte_size_limit(self): + byte_size_corpus = [ + EmbeddableItem( + id=str(i), + content=Content(text=f"Byte size doc {i}"), + metadata={"data": "x" * 9000}, + embedding=Embedding(dense_embedding=[1.0, 0.0]), + ) for i in range(5) + ] + + write_config = QdrantWriteConfig( + connection_params=self._connection_params, + collection_name=self._collection_name, + batch_size=100, + max_batch_byte_size=15_000, + ) + + with TestPipeline() as p: + _ = ( + p + | beam.Create(byte_size_corpus) + | write_config.create_write_transform()) + + with self.qdrant_client() as client: + count_result = client.count(collection_name=self._collection_name) + self.assertEqual(count_result.count, len(byte_size_corpus)) + + points, _ = client.scroll( + collection_name=self._collection_name, + limit=100, + with_payload=True, + with_vectors=True, + ) + points_by_id = {p.id: p for p in points} + + for item in byte_size_corpus: + expected_record = models.Record( + id=int(item.id), + vector={ + "dense": item.dense_embedding, + }, + payload=item.metadata, + ) + self.assertEqual(expected_record, points_by_id[int(item.id)]) + if __name__ == "__main__": unittest.main() From 62f115a7ff664bdf2e66b5f38ae7b96eeba83316 Mon Sep 17 00:00:00 2001 From: Michael Gruschke Date: Sat, 9 May 2026 13:01:25 +0200 Subject: [PATCH 07/18] fix: add positive batch_size check for qdrant config --- sdks/python/apache_beam/ml/rag/ingestion/qdrant.py | 2 ++ 1 file changed, 2 insertions(+) diff --git a/sdks/python/apache_beam/ml/rag/ingestion/qdrant.py b/sdks/python/apache_beam/ml/rag/ingestion/qdrant.py index d71ee289b549..bafb07e8efc7 100644 --- a/sdks/python/apache_beam/ml/rag/ingestion/qdrant.py +++ b/sdks/python/apache_beam/ml/rag/ingestion/qdrant.py @@ -116,6 +116,8 @@ class QdrantWriteConfig(VectorDatabaseWriteConfig): def __post_init__(self): if not self.collection_name: raise ValueError("Collection name must be provided") + if self.batch_size <= 0: + raise ValueError("Batch size must be a positive integer") def create_write_transform(self) -> beam.PTransform[EmbeddableItem, Any]: return _QdrantWriteTransform(self) From 41d19280d00fa5b8fbfa1cadeee64748a023b092 Mon Sep 17 00:00:00 2001 From: Michael Gruschke Date: Sat, 9 May 2026 20:18:38 +0200 Subject: [PATCH 08/18] feat: add factory methods for qdrant connection params --- .../apache_beam/ml/rag/ingestion/qdrant.py | 73 +++++++++++++++++++ 1 file changed, 73 insertions(+) diff --git a/sdks/python/apache_beam/ml/rag/ingestion/qdrant.py b/sdks/python/apache_beam/ml/rag/ingestion/qdrant.py index bafb07e8efc7..3a92bb11b6a3 100644 --- a/sdks/python/apache_beam/ml/rag/ingestion/qdrant.py +++ b/sdks/python/apache_beam/ml/rag/ingestion/qdrant.py @@ -86,6 +86,79 @@ def __post_init__(self): raise ValueError( "One of location, url, host, or path must be provided for Qdrant") + @classmethod + def for_cloud( + cls, + url: str, + api_key: str, + *, + prefer_grpc: bool = False, + timeout: Optional[int] = None, + **kwargs: Any, + ) -> "QdrantConnectionParameters": + """Connect to Qdrant Cloud. Requires the cluster URL and an API key.""" + return cls( + url=url, + api_key=api_key, + https=True, + prefer_grpc=prefer_grpc, + timeout=timeout, + kwargs=kwargs, + ) + + @classmethod + def for_host( + cls, + host: str, + port: int = 6333, + *, + grpc_port: int = 6334, + prefer_grpc: bool = False, + https: bool = False, + api_key: Optional[str] = None, + timeout: Optional[int] = None, + **kwargs: Any, + ) -> "QdrantConnectionParameters": + """Connect to a self-hosted Qdrant instance by host and port.""" + return cls( + host=host, + port=port, + grpc_port=grpc_port, + prefer_grpc=prefer_grpc, + https=https, + api_key=api_key, + timeout=timeout, + kwargs=kwargs, + ) + + @classmethod + def for_url( + cls, + url: str, + *, + api_key: Optional[str] = None, + prefer_grpc: bool = False, + timeout: Optional[int] = None, + **kwargs: Any, + ) -> "QdrantConnectionParameters": + """Connect using a full URL like 'https://my-qdrant.example.com:6333'.""" + return cls( + url=url, + api_key=api_key, + prefer_grpc=prefer_grpc, + timeout=timeout, + kwargs=kwargs) + + @classmethod + def local(cls, path: str) -> "QdrantConnectionParameters": + """Use an embedded Qdrant instance persisted to the given path.""" + return cls(path=path) + + @classmethod + def in_memory(cls) -> "QdrantConnectionParameters": + """Use an embedded in-memory Qdrant instance. Useful for tests.""" + return cls(location=":memory:") + @dataclass class QdrantWriteConfig(VectorDatabaseWriteConfig): From f27ebe9c52b072164c5423f9ccfd185f3abe62c8 Mon Sep 17 00:00:00 2001 From: Michael Gruschke Date: Sat, 9 May 2026 22:32:53 +0200 Subject: [PATCH 09/18] feat: add retry to qdrant ingestion --- .../apache_beam/ml/rag/ingestion/qdrant.py | 32 +++++++++++++++---- 1 file changed, 26 insertions(+), 6 deletions(-) diff --git a/sdks/python/apache_beam/ml/rag/ingestion/qdrant.py b/sdks/python/apache_beam/ml/rag/ingestion/qdrant.py index 3a92bb11b6a3..53b8eff54176 100644 --- a/sdks/python/apache_beam/ml/rag/ingestion/qdrant.py +++ b/sdks/python/apache_beam/ml/rag/ingestion/qdrant.py @@ -15,14 +15,18 @@ # limitations under the License. import logging +import time from collections.abc import Callable from dataclasses import dataclass, field from typing import Any, Optional +import grpc from objsize import get_deep_size try: from qdrant_client import QdrantClient, models + from qdrant_client.common.client_exceptions import ResourceExhaustedResponse + from qdrant_client.http.exceptions import ResponseHandlingException, UnexpectedResponse except ImportError: logging.warning("Qdrant client library is not installed.") @@ -285,12 +289,28 @@ def _flush(self): return if not self._client: raise RuntimeError("Qdrant client is not initialized") - self._client.upsert( - collection_name=self.config.collection_name, - points=self._batch, - timeout=self.config.timeout, - **self.config.kwargs, - ) + + max_retries = 3 + attempt = 1 + while True: + try: + self._client.upsert( + collection_name=self.config.collection_name, + points=self._batch, + timeout=self.config.timeout, + **self.config.kwargs, + ) + break + except ResourceExhaustedResponse as e: + time.sleep(e.retry_after_s) + # don't count rate-limit against max_retries + continue + except (UnexpectedResponse, ResponseHandlingException, + grpc.RpcError) as e: + if attempt > max_retries: + raise + time.sleep(2**attempt) + attempt += 1 self._batch = [] self._batch_byte_size = 0 From 8d4bb13ed78931eb10cfb733105318d19503ecc6 Mon Sep 17 00:00:00 2001 From: Michael Gruschke Date: Sat, 9 May 2026 22:33:04 +0200 Subject: [PATCH 10/18] chore: add unit tests --- .../ml/rag/ingestion/qdrant_test.py | 482 ++++++++++++++++++ 1 file changed, 482 insertions(+) create mode 100644 sdks/python/apache_beam/ml/rag/ingestion/qdrant_test.py diff --git a/sdks/python/apache_beam/ml/rag/ingestion/qdrant_test.py b/sdks/python/apache_beam/ml/rag/ingestion/qdrant_test.py new file mode 100644 index 000000000000..3d49cd03e339 --- /dev/null +++ b/sdks/python/apache_beam/ml/rag/ingestion/qdrant_test.py @@ -0,0 +1,482 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import unittest +from unittest import mock + +try: + from qdrant_client import QdrantClient, models + from qdrant_client.common.client_exceptions import ResourceExhaustedResponse + from qdrant_client.http.exceptions import ( + ResponseHandlingException, + UnexpectedResponse, + ) + + QDRANT_AVAILABLE = True +except ImportError: + QDRANT_AVAILABLE = False + +import grpc +from objsize import get_deep_size + +from apache_beam.ml.rag.ingestion.qdrant import ( + _QdrantWriteFn, + QdrantWriteConfig, + QdrantConnectionParameters, +) +from apache_beam.ml.rag.types import EmbeddableItem, Content, Embedding + + +class TestQdrantConnectionParameters(unittest.TestCase): + def test_no_params_raises_value_error(self): + with self.assertRaises(ValueError): + QdrantConnectionParameters() + + def test_location_is_sufficient(self): + QdrantConnectionParameters(location=":memory:") + + def test_url_is_sufficient(self): + QdrantConnectionParameters(url="http://localhost:6333") + + def test_host_is_sufficient(self): + QdrantConnectionParameters(host="localhost") + + def test_path_is_sufficient(self): + QdrantConnectionParameters(path="/tmp/qdrant") + + +class TestQdrantWriteConfig(unittest.TestCase): + def test_empty_collection_name_raises_value_error(self): + with self.assertRaises(ValueError): + QdrantWriteConfig( + connection_params=QdrantConnectionParameters(location=":memory:"), + collection_name="", + ) + + def test_none_collection_name_raises_value_error(self): + with self.assertRaises(ValueError): + QdrantWriteConfig( + connection_params=QdrantConnectionParameters(location=":memory:"), + collection_name=None, + ) + + def test_batch_size_zero_raises_value_error(self): + with self.assertRaises(ValueError): + QdrantWriteConfig( + connection_params=QdrantConnectionParameters(location=":memory:"), + collection_name="test", + batch_size=0, + ) + + def test_batch_size_negative_raises_value_error(self): + with self.assertRaises(ValueError): + QdrantWriteConfig( + connection_params=QdrantConnectionParameters(location=":memory:"), + collection_name="test", + batch_size=-1, + ) + + def test_display_data(self): + config = QdrantWriteConfig( + connection_params=QdrantConnectionParameters(location=":memory:"), + collection_name="test", + batch_size=100, + max_batch_byte_size=5000, + ) + fn = _QdrantWriteFn(config) + data = fn.display_data() + self.assertEqual(data["collection"], "test") + self.assertEqual(data["batch_size"], 100) + self.assertEqual(data["max_batch_byte_size"], 5000) + + def test_for_cloud_creates_connection(self): + params = QdrantConnectionParameters.for_cloud( + url="https://test.cloud.qdrant.io", + api_key="my-key", + ) + self.assertEqual(params.url, "https://test.cloud.qdrant.io") + self.assertEqual(params.api_key, "my-key") + self.assertTrue(params.https) + + def test_for_host_creates_connection(self): + params = QdrantConnectionParameters.for_host(host="localhost", port=6333) + self.assertEqual(params.host, "localhost") + self.assertEqual(params.port, 6333) + + def test_in_memory_creates_connection(self): + params = QdrantConnectionParameters.in_memory() + self.assertEqual(params.location, ":memory:") + + def test_for_url_creates_connection(self): + params = QdrantConnectionParameters.for_url(url="http://localhost:6333") + self.assertEqual(params.url, "http://localhost:6333") + + def test_kwargs_passthrough(self): + config = QdrantWriteConfig( + connection_params=QdrantConnectionParameters(location=":memory:"), + collection_name="test", + kwargs={"parallel": 4}, + ) + self.assertEqual(config.kwargs, {"parallel": 4}) + + +@unittest.skipIf(not QDRANT_AVAILABLE, "qdrant dependencies not installed.") +class TestQdrantCreateConverter(unittest.TestCase): + def setUp(self): + self.config = QdrantWriteConfig( + connection_params=QdrantConnectionParameters(location=":memory:"), + collection_name="test", + ) + self.convert = self.config.create_converter() + + def test_dense_embedding_only(self): + item = EmbeddableItem( + id="1", + content=Content(text="test"), + embedding=Embedding(dense_embedding=[1.0, 2.0]), + ) + result = self.convert(item) + self.assertIsInstance(result, models.PointStruct) + self.assertEqual(result.id, 1) + self.assertEqual(result.vector, {"dense": [1.0, 2.0]}) + self.assertIsNone(result.payload) + + def test_sparse_embedding_only(self): + item = EmbeddableItem( + id="2", + content=Content(text="test"), + embedding=Embedding(sparse_embedding=([0, 1], [0.5, 0.3])), + ) + result = self.convert(item) + self.assertIsInstance(result, models.PointStruct) + self.assertIn("sparse", result.vector) + sparse_vec = result.vector["sparse"] + self.assertIsInstance(sparse_vec, models.SparseVector) + self.assertEqual(sparse_vec.indices, [0, 1]) + self.assertEqual(sparse_vec.values, [0.5, 0.3]) + + def test_both_dense_and_sparse(self): + item = EmbeddableItem( + id="3", + content=Content(text="test"), + embedding=Embedding( + dense_embedding=[1.0, 2.0], + sparse_embedding=([0], [0.9]), + ), + ) + result = self.convert(item) + self.assertEqual(set(result.vector.keys()), {"dense", "sparse"}) + self.assertEqual(result.vector["dense"], [1.0, 2.0]) + self.assertEqual(result.id, 3) + + def test_raises_when_no_embedding(self): + item = EmbeddableItem( + id="4", + content=Content(text="test"), + ) + with self.assertRaises(ValueError): + self.convert(item) + + def test_string_digit_id_converted_to_int(self): + item = EmbeddableItem( + id="42", + content=Content(text="test"), + embedding=Embedding(dense_embedding=[0.1, 0.2]), + ) + result = self.convert(item) + self.assertEqual(result.id, 42) + self.assertIsInstance(result.id, int) + + def test_non_digit_string_id_preserved(self): + item = EmbeddableItem( + id="abc-123", + content=Content(text="test"), + embedding=Embedding(dense_embedding=[0.1, 0.2]), + ) + result = self.convert(item) + self.assertEqual(result.id, "abc-123") + self.assertIsInstance(result.id, str) + + def test_integer_id_preserved(self): + item = EmbeddableItem( + id="99", + content=Content(text="test"), + embedding=Embedding(dense_embedding=[0.1, 0.2]), + ) + result = self.convert(item) + self.assertEqual(result.id, 99) + self.assertIsInstance(result.id, int) + + def test_none_metadata_becomes_none_payload(self): + item = EmbeddableItem( + id="1", + content=Content(text="test"), + embedding=Embedding(dense_embedding=[0.1, 0.2]), + metadata={}, + ) + result = self.convert(item) + self.assertIsNone(result.payload) + + def test_custom_vector_keys(self): + config = QdrantWriteConfig( + connection_params=QdrantConnectionParameters(location=":memory:"), + collection_name="test", + dense_embedding_key="my_dense", + sparse_embedding_key="my_sparse", + ) + convert = config.create_converter() + item = EmbeddableItem( + id="1", + content=Content(text="test"), + embedding=Embedding( + dense_embedding=[1.0], + sparse_embedding=([0], [0.5]), + ), + ) + result = convert(item) + self.assertIn("my_dense", result.vector) + self.assertIn("my_sparse", result.vector) + self.assertNotIn("dense", result.vector) + self.assertNotIn("sparse", result.vector) + + def test_payload_includes_metadata(self): + item = EmbeddableItem( + id="1", + content=Content(text="test"), + embedding=Embedding(dense_embedding=[1.0]), + metadata={ + "source": "test", "score": 0.95 + }, + ) + result = self.convert(item) + self.assertEqual(result.payload, {"source": "test", "score": 0.95}) + + def test_convert_from_text_factory(self): + item = EmbeddableItem.from_text("hello", metadata={"source": "test"}) + item.embedding = Embedding(dense_embedding=[0.5, 0.5]) + result = self.convert(item) + self.assertIsInstance(result, models.PointStruct) + self.assertIn("dense", result.vector) + + +@unittest.skipIf(not QDRANT_AVAILABLE, "qdrant dependencies not installed.") +class TestQdrantWriteFnBatching(unittest.TestCase): + def setUp(self): + self.config = QdrantWriteConfig( + connection_params=QdrantConnectionParameters(location=":memory:"), + collection_name="test", + batch_size=3, + ) + self.fn = _QdrantWriteFn(self.config) + self.fn._client = mock.MagicMock() + self.fn.start_bundle() + + def test_batch_size_triggers_flush_correctly(self): + client = self.fn._client + for i in range(5): + self.fn.process( + EmbeddableItem( + id=str(i), + content=Content(text="test"), + embedding=Embedding(dense_embedding=[float(i)]), + )) + self.fn.finish_bundle() + + self.assertEqual(client.upsert.call_count, 2) + first = client.upsert.call_args_list[0][1]["points"] + second = client.upsert.call_args_list[1][1]["points"] + self.assertEqual(len(first), 3) + self.assertEqual(len(second), 2) + self.assertEqual(first[0].id, "0") + self.assertEqual(first[1].id, "1") + self.assertEqual(first[2].id, "2") + self.assertEqual(second[0].id, "3") + self.assertEqual(second[1].id, "4") + + def test_partial_batch_flushed_on_finish_bundle(self): + for i in range(2): + self.fn.process( + EmbeddableItem( + id=str(i), + content=Content(text="test"), + embedding=Embedding(dense_embedding=[float(i)]), + )) + self.fn.finish_bundle() + + points = self.fn._client.upsert.call_args[1]["points"] + self.assertEqual(len(points), 2) + + def test_byte_size_exceeded_triggers_flush(self): + item = EmbeddableItem( + id="1", + content=Content( + text="a" * 256, + image=b"x" * 1024, + ), + ) + item_size = get_deep_size(item) + + config = QdrantWriteConfig( + connection_params=QdrantConnectionParameters(location=":memory:"), + collection_name="test", + batch_size=10, + max_batch_byte_size=item_size * 2, + ) + fn = _QdrantWriteFn(config) + fn._client = mock.MagicMock() + fn.start_bundle() + client = fn._client + + for i in range(3): + fn.process( + EmbeddableItem( + id=str(i), + content=Content( + text="a" * 256, + image=b"x" * 1024, + ), + )) + fn.finish_bundle() + + self.assertEqual(client.upsert.call_count, 2) + first = client.upsert.call_args_list[0][1]["points"] + second = client.upsert.call_args_list[1][1]["points"] + self.assertEqual(len(first), 2) + self.assertEqual(len(second), 1) + + +@unittest.skipIf(not QDRANT_AVAILABLE, "qdrant dependencies not installed.") +class TestQdrantWriteFnRetries(unittest.TestCase): + def setUp(self): + self.config = QdrantWriteConfig( + connection_params=QdrantConnectionParameters(location=":memory:"), + collection_name="test", + ) + self.fn = _QdrantWriteFn(self.config) + self.fn._client = mock.MagicMock() + self.fn._batch = [ + EmbeddableItem( + id="1", + content=Content(text="test"), + embedding=Embedding(dense_embedding=[1.0]), + ) + ] + self.fn._batch_byte_size = 100 + + def test_retry_on_unexpected_response(self): + self.fn._client.upsert.side_effect = [ + UnexpectedResponse(429, "error", b"", None), + None, + ] + with mock.patch("time.sleep") as mock_sleep: + self.fn._flush() + self.assertEqual(self.fn._client.upsert.call_count, 2) + mock_sleep.assert_called_once_with(2) + + def test_retry_on_response_handling_exception(self): + self.fn._client.upsert.side_effect = [ + ResponseHandlingException(Exception("error")), + None, + ] + with mock.patch("time.sleep") as mock_sleep: + self.fn._flush() + self.assertEqual(self.fn._client.upsert.call_count, 2) + mock_sleep.assert_called_once_with(2) + + def test_retry_on_grpc_error(self): + self.fn._client.upsert.side_effect = [ + grpc.RpcError("error"), + None, + ] + with mock.patch("time.sleep") as mock_sleep: + self.fn._flush() + self.assertEqual(self.fn._client.upsert.call_count, 2) + mock_sleep.assert_called_once_with(2) + + def test_rate_limit_does_not_increment_attempt(self): + exc = ResourceExhaustedResponse("rate limited", 0) + exc.retry_after_s = 0.01 + self.fn._client.upsert.side_effect = [exc, None] + with mock.patch("time.sleep") as mock_sleep: + self.fn._flush() + self.assertEqual(self.fn._client.upsert.call_count, 2) + mock_sleep.assert_called_once_with(0.01) + + def test_multiple_rate_limits_dont_exhaust_retries(self): + exc = ResourceExhaustedResponse("rate limited", 0) + exc.retry_after_s = 0.01 + self.fn._client.upsert.side_effect = [exc, exc, exc, None] + with mock.patch("time.sleep") as mock_sleep: + self.fn._flush() + self.assertEqual(self.fn._client.upsert.call_count, 4) + self.assertEqual(mock_sleep.call_count, 3) + + def test_rate_limit_then_error_then_success(self): + exc_rate = ResourceExhaustedResponse("rate limited", 0) + exc_rate.retry_after_s = 0.01 + exc_error = UnexpectedResponse(429, "error", b"", None) + self.fn._client.upsert.side_effect = [exc_error, exc_rate, None] + with mock.patch("time.sleep") as mock_sleep: + self.fn._flush() + self.assertEqual(self.fn._client.upsert.call_count, 3) + self.assertEqual(mock_sleep.call_args_list[0], mock.call(2)) + self.assertEqual(mock_sleep.call_args_list[1], mock.call(0.01)) + + def test_exponential_backoff_values(self): + self.fn._client.upsert.side_effect = [ + UnexpectedResponse(429, "e1", b"", None), + UnexpectedResponse(429, "e2", b"", None), + UnexpectedResponse(429, "e3", b"", None), + None, + ] + with mock.patch("time.sleep") as mock_sleep: + self.fn._flush() + self.assertEqual(self.fn._client.upsert.call_count, 4) + self.assertEqual(mock_sleep.call_args_list[0], mock.call(2)) + self.assertEqual(mock_sleep.call_args_list[1], mock.call(4)) + self.assertEqual(mock_sleep.call_args_list[2], mock.call(8)) + + def test_raises_after_max_retries(self): + self.fn._client.upsert.side_effect = [ + UnexpectedResponse(429, "e1", b"", None), + UnexpectedResponse(429, "e2", b"", None), + UnexpectedResponse(429, "e3", b"", None), + UnexpectedResponse(429, "e4", b"", None), + ] + with mock.patch("time.sleep") as mock_sleep: + with self.assertRaises(UnexpectedResponse): + self.fn._flush() + self.assertEqual(self.fn._client.upsert.call_count, 4) + self.assertEqual(mock_sleep.call_count, 3) + + def test_raises_on_last_non_rate_limit_attempt(self): + exc_rate = ResourceExhaustedResponse("rate limited", 0) + exc_rate.retry_after_s = 0.01 + self.fn._client.upsert.side_effect = [ + exc_rate, + UnexpectedResponse(429, "e1", b"", None), + UnexpectedResponse(429, "e2", b"", None), + UnexpectedResponse(429, "e3", b"", None), + UnexpectedResponse(429, "e4", b"", None), + ] + with mock.patch("time.sleep") as mock_sleep: + with self.assertRaises(UnexpectedResponse): + self.fn._flush() + self.assertEqual(self.fn._client.upsert.call_count, 5) + + +if __name__ == "__main__": + unittest.main() From 9f7fceacc5e0e67f9e0c69a33e558c22b7d39656 Mon Sep 17 00:00:00 2001 From: Michael Gruschke Date: Thu, 14 May 2026 16:22:25 +0200 Subject: [PATCH 11/18] fix: import linting errors on qdrant --- .../apache_beam/ml/rag/ingestion/qdrant.py | 12 +++++++---- .../ml/rag/ingestion/qdrant_it_test.py | 3 ++- .../ml/rag/ingestion/qdrant_test.py | 20 +++++++++---------- 3 files changed, 19 insertions(+), 16 deletions(-) diff --git a/sdks/python/apache_beam/ml/rag/ingestion/qdrant.py b/sdks/python/apache_beam/ml/rag/ingestion/qdrant.py index 53b8eff54176..1f8fdb31c983 100644 --- a/sdks/python/apache_beam/ml/rag/ingestion/qdrant.py +++ b/sdks/python/apache_beam/ml/rag/ingestion/qdrant.py @@ -17,16 +17,20 @@ import logging import time from collections.abc import Callable -from dataclasses import dataclass, field -from typing import Any, Optional +from dataclasses import dataclass +from dataclasses import field +from typing import Any +from typing import Optional import grpc from objsize import get_deep_size try: - from qdrant_client import QdrantClient, models + from qdrant_client import QdrantClient + from qdrant_client import models from qdrant_client.common.client_exceptions import ResourceExhaustedResponse - from qdrant_client.http.exceptions import ResponseHandlingException, UnexpectedResponse + from qdrant_client.http.exceptions import ResponseHandlingException + from qdrant_client.http.exceptions import UnexpectedResponse except ImportError: logging.warning("Qdrant client library is not installed.") diff --git a/sdks/python/apache_beam/ml/rag/ingestion/qdrant_it_test.py b/sdks/python/apache_beam/ml/rag/ingestion/qdrant_it_test.py index 069fe7e12331..ea97ec6638e8 100644 --- a/sdks/python/apache_beam/ml/rag/ingestion/qdrant_it_test.py +++ b/sdks/python/apache_beam/ml/rag/ingestion/qdrant_it_test.py @@ -28,7 +28,8 @@ # pylint: disable=ungrouped-imports try: - from qdrant_client import QdrantClient, models + from qdrant_client import QdrantClient + from qdrant_client import models QDRANT_AVAILABLE = True except ImportError: QDRANT_AVAILABLE = False diff --git a/sdks/python/apache_beam/ml/rag/ingestion/qdrant_test.py b/sdks/python/apache_beam/ml/rag/ingestion/qdrant_test.py index 3d49cd03e339..ff4ee14e97a0 100644 --- a/sdks/python/apache_beam/ml/rag/ingestion/qdrant_test.py +++ b/sdks/python/apache_beam/ml/rag/ingestion/qdrant_test.py @@ -18,12 +18,10 @@ from unittest import mock try: - from qdrant_client import QdrantClient, models + from qdrant_client import models from qdrant_client.common.client_exceptions import ResourceExhaustedResponse - from qdrant_client.http.exceptions import ( - ResponseHandlingException, - UnexpectedResponse, - ) + from qdrant_client.http.exceptions import ResponseHandlingException + from qdrant_client.http.exceptions import UnexpectedResponse QDRANT_AVAILABLE = True except ImportError: @@ -32,12 +30,12 @@ import grpc from objsize import get_deep_size -from apache_beam.ml.rag.ingestion.qdrant import ( - _QdrantWriteFn, - QdrantWriteConfig, - QdrantConnectionParameters, -) -from apache_beam.ml.rag.types import EmbeddableItem, Content, Embedding +from apache_beam.ml.rag.ingestion.qdrant import QdrantConnectionParameters +from apache_beam.ml.rag.ingestion.qdrant import QdrantWriteConfig +from apache_beam.ml.rag.ingestion.qdrant import _QdrantWriteFn +from apache_beam.ml.rag.types import Content +from apache_beam.ml.rag.types import EmbeddableItem +from apache_beam.ml.rag.types import Embedding class TestQdrantConnectionParameters(unittest.TestCase): From 413716a99c1b5393db643c93a9718eeb6eb639d3 Mon Sep 17 00:00:00 2001 From: Michael Gruschke Date: Thu, 21 May 2026 09:12:16 +0200 Subject: [PATCH 12/18] fix: use direct runner for qdrant integration tests --- .../ml/rag/ingestion/qdrant_it_test.py | 30 ++++++++++++++----- 1 file changed, 22 insertions(+), 8 deletions(-) diff --git a/sdks/python/apache_beam/ml/rag/ingestion/qdrant_it_test.py b/sdks/python/apache_beam/ml/rag/ingestion/qdrant_it_test.py index ea97ec6638e8..ff916a924515 100644 --- a/sdks/python/apache_beam/ml/rag/ingestion/qdrant_it_test.py +++ b/sdks/python/apache_beam/ml/rag/ingestion/qdrant_it_test.py @@ -30,6 +30,7 @@ try: from qdrant_client import QdrantClient from qdrant_client import models + QDRANT_AVAILABLE = True except ImportError: QDRANT_AVAILABLE = False @@ -60,7 +61,7 @@ @unittest.skipIf(not QDRANT_AVAILABLE, "qdrant dependencies not installed.") class TestQdrantIngestion(unittest.TestCase): @contextlib.contextmanager - def qdrant_client(self) -> 'QdrantClient': + def qdrant_client(self) -> "QdrantClient": client = QdrantClient(path=self._temp_dir.name) try: yield client @@ -97,7 +98,9 @@ def test_write_on_non_existent_collection(self): ) with self.assertRaises(Exception): - with TestPipeline() as p: + p = TestPipeline() + p.not_use_test_runner_api = True + with p: _ = p | beam.Create(TEST_CORPUS) | write_config.create_write_transform() def test_write_dense_embeddings_only(self): @@ -107,7 +110,9 @@ def test_write_dense_embeddings_only(self): batch_size=len(TEST_CORPUS), ) - with TestPipeline() as p: + p = TestPipeline() + p.not_use_test_runner_api = True + with p: _ = p | beam.Create(TEST_CORPUS) | write_config.create_write_transform() with self.qdrant_client() as client: @@ -152,7 +157,9 @@ def test_write_sparse_embeddings_only(self): batch_size=len(sparse_corpus), ) - with TestPipeline() as p: + p = TestPipeline() + p.not_use_test_runner_api = True + with p: _ = p | beam.Create(sparse_corpus) | write_config.create_write_transform() with self.qdrant_client() as client: @@ -206,7 +213,9 @@ def test_write_both_dense_and_sparse(self): batch_size=len(hybrid_corpus), ) - with TestPipeline() as p: + p = TestPipeline() + p.not_use_test_runner_api = True + with p: _ = p | beam.Create(hybrid_corpus) | write_config.create_write_transform() with self.qdrant_client() as client: @@ -228,7 +237,8 @@ def test_write_both_dense_and_sparse(self): "dense": item.dense_embedding, "sparse": models.SparseVector( indices=item.sparse_embedding[0], - values=item.sparse_embedding[1]), + values=item.sparse_embedding[1], + ), }, payload=item.metadata, ) @@ -250,7 +260,9 @@ def test_write_with_batching(self): batch_size=3, ) - with TestPipeline() as p: + p = TestPipeline() + p.not_use_test_runner_api = True + with p: _ = p | beam.Create(batch_corpus) | write_config.create_write_transform() with self.qdrant_client() as client: @@ -292,7 +304,9 @@ def test_write_with_byte_size_limit(self): max_batch_byte_size=15_000, ) - with TestPipeline() as p: + p = TestPipeline() + p.not_use_test_runner_api = True + with p: _ = ( p | beam.Create(byte_size_corpus) From 34629b967d9061d668a43e2508bed4931abc050a Mon Sep 17 00:00:00 2001 From: Michael Gruschke Date: Thu, 21 May 2026 09:13:33 +0200 Subject: [PATCH 13/18] fix: safe guard qdrant client close --- sdks/python/apache_beam/ml/rag/ingestion/qdrant.py | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) diff --git a/sdks/python/apache_beam/ml/rag/ingestion/qdrant.py b/sdks/python/apache_beam/ml/rag/ingestion/qdrant.py index 1f8fdb31c983..abe9efc56cbf 100644 --- a/sdks/python/apache_beam/ml/rag/ingestion/qdrant.py +++ b/sdks/python/apache_beam/ml/rag/ingestion/qdrant.py @@ -282,8 +282,10 @@ def setup(self): def teardown(self): if self._client: - self._client.close() - self._client = None + try: + self._client.close() + finally: + self._client = None def finish_bundle(self): self._flush() From da5bf828db2a61290aaa15267d138c4f29f286b7 Mon Sep 17 00:00:00 2001 From: Michael Gruschke Date: Thu, 21 May 2026 20:51:19 +0200 Subject: [PATCH 14/18] fix: use testcontainers for qdrant it test --- .../ml/rag/ingestion/qdrant_it_test.py | 164 ++++++++---------- sdks/python/setup.py | 2 +- 2 files changed, 76 insertions(+), 90 deletions(-) diff --git a/sdks/python/apache_beam/ml/rag/ingestion/qdrant_it_test.py b/sdks/python/apache_beam/ml/rag/ingestion/qdrant_it_test.py index ff916a924515..8b4860b426cc 100644 --- a/sdks/python/apache_beam/ml/rag/ingestion/qdrant_it_test.py +++ b/sdks/python/apache_beam/ml/rag/ingestion/qdrant_it_test.py @@ -14,8 +14,7 @@ # See the License for the specific language governing permissions and # limitations under the License. -import contextlib -import tempfile +import pytest import unittest import apache_beam as beam @@ -28,12 +27,16 @@ # pylint: disable=ungrouped-imports try: - from qdrant_client import QdrantClient from qdrant_client import models QDRANT_AVAILABLE = True except ImportError: QDRANT_AVAILABLE = False + +try: + from testcontainers.qdrant import QdrantContainer +except ImportError: + QdrantContainer = None # pylint: enable=ungrouped-imports TEST_CORPUS = [ @@ -59,35 +62,35 @@ @unittest.skipIf(not QDRANT_AVAILABLE, "qdrant dependencies not installed.") +@unittest.skipIf(not QdrantContainer, "qdrant test_container not installed.") +@pytest.mark.uses_testcontainers class TestQdrantIngestion(unittest.TestCase): - @contextlib.contextmanager - def qdrant_client(self) -> "QdrantClient": - client = QdrantClient(path=self._temp_dir.name) - try: - yield client - finally: - client.close() + @classmethod + def setUpClass(cls): + cls._container = QdrantContainer() + cls._container.start() + cls._host = cls._container.get_container_host_ip() + cls._port = int(cls._container.get_exposed_port(6333)) + cls._connection_params = QdrantConnectionParameters( + host=cls._host, port=cls._port) + cls.client = cls._container.get_client() def setUp(self): - self._temp_dir = tempfile.TemporaryDirectory() self._collection_name = f"test_collection_{self._testMethodName}" - with self.qdrant_client() as client: - client.create_collection( - collection_name=self._collection_name, - vectors_config={ - "dense": models.VectorParams( - size=2, distance=models.Distance.COSINE) - }, - sparse_vectors_config={"sparse": models.SparseVectorParams()}, - ) - assert client.collection_exists(collection_name=self._collection_name) - - self._connection_params = QdrantConnectionParameters( - path=self._temp_dir.name) + self.client.create_collection( + collection_name=self._collection_name, + vectors_config={ + "dense": models.VectorParams( + size=2, distance=models.Distance.COSINE) + }, + sparse_vectors_config={"sparse": models.SparseVectorParams()}, + ) + assert self.client.collection_exists(collection_name=self._collection_name) - def tearDown(self): - self._temp_dir.cleanup() + @classmethod + def tearDownClass(cls): + cls._container.stop() def test_write_on_non_existent_collection(self): non_existent = "nonexistent_collection" @@ -98,9 +101,7 @@ def test_write_on_non_existent_collection(self): ) with self.assertRaises(Exception): - p = TestPipeline() - p.not_use_test_runner_api = True - with p: + with TestPipeline(is_integration_test=True) as p: _ = p | beam.Create(TEST_CORPUS) | write_config.create_write_transform() def test_write_dense_embeddings_only(self): @@ -110,21 +111,18 @@ def test_write_dense_embeddings_only(self): batch_size=len(TEST_CORPUS), ) - p = TestPipeline() - p.not_use_test_runner_api = True - with p: + with TestPipeline(is_integration_test=True) as p: _ = p | beam.Create(TEST_CORPUS) | write_config.create_write_transform() - with self.qdrant_client() as client: - count_result = client.count(collection_name=self._collection_name) - self.assertEqual(count_result.count, len(TEST_CORPUS)) + count_result = self.client.count(collection_name=self._collection_name) + self.assertEqual(count_result.count, len(TEST_CORPUS)) - points, _ = client.scroll( - collection_name=self._collection_name, - limit=100, - with_payload=True, - with_vectors=True, - ) + points, _ = self.client.scroll( + collection_name=self._collection_name, + limit=100, + with_payload=True, + with_vectors=True, + ) points_by_id = {p.id: p for p in points} for item in TEST_CORPUS: @@ -157,21 +155,18 @@ def test_write_sparse_embeddings_only(self): batch_size=len(sparse_corpus), ) - p = TestPipeline() - p.not_use_test_runner_api = True - with p: + with TestPipeline(is_integration_test=True) as p: _ = p | beam.Create(sparse_corpus) | write_config.create_write_transform() - with self.qdrant_client() as client: - count_result = client.count(collection_name=self._collection_name) - self.assertEqual(count_result.count, len(sparse_corpus)) + count_result = self.client.count(collection_name=self._collection_name) + self.assertEqual(count_result.count, len(sparse_corpus)) - points, _ = client.scroll( - collection_name=self._collection_name, - limit=100, - with_payload=True, - with_vectors=True, - ) + points, _ = self.client.scroll( + collection_name=self._collection_name, + limit=100, + with_payload=True, + with_vectors=True, + ) points_by_id = {p.id: p for p in points} for item in sparse_corpus: @@ -213,21 +208,18 @@ def test_write_both_dense_and_sparse(self): batch_size=len(hybrid_corpus), ) - p = TestPipeline() - p.not_use_test_runner_api = True - with p: + with TestPipeline(is_integration_test=True) as p: _ = p | beam.Create(hybrid_corpus) | write_config.create_write_transform() - with self.qdrant_client() as client: - count_result = client.count(collection_name=self._collection_name) - self.assertEqual(count_result.count, len(hybrid_corpus)) + count_result = self.client.count(collection_name=self._collection_name) + self.assertEqual(count_result.count, len(hybrid_corpus)) - points, _ = client.scroll( - collection_name=self._collection_name, - limit=100, - with_payload=True, - with_vectors=True, - ) + points, _ = self.client.scroll( + collection_name=self._collection_name, + limit=100, + with_payload=True, + with_vectors=True, + ) points_by_id = {p.id: p for p in points} for item in hybrid_corpus: @@ -260,21 +252,18 @@ def test_write_with_batching(self): batch_size=3, ) - p = TestPipeline() - p.not_use_test_runner_api = True - with p: + with TestPipeline(is_integration_test=True) as p: _ = p | beam.Create(batch_corpus) | write_config.create_write_transform() - with self.qdrant_client() as client: - count_result = client.count(collection_name=self._collection_name) - self.assertEqual(count_result.count, len(batch_corpus)) + count_result = self.client.count(collection_name=self._collection_name) + self.assertEqual(count_result.count, len(batch_corpus)) - points, _ = client.scroll( - collection_name=self._collection_name, - limit=100, - with_payload=True, - with_vectors=True, - ) + points, _ = self.client.scroll( + collection_name=self._collection_name, + limit=100, + with_payload=True, + with_vectors=True, + ) points_by_id = {p.id: p for p in points} for item in batch_corpus: @@ -304,24 +293,21 @@ def test_write_with_byte_size_limit(self): max_batch_byte_size=15_000, ) - p = TestPipeline() - p.not_use_test_runner_api = True - with p: + with TestPipeline(is_integration_test=True) as p: _ = ( p | beam.Create(byte_size_corpus) | write_config.create_write_transform()) - with self.qdrant_client() as client: - count_result = client.count(collection_name=self._collection_name) - self.assertEqual(count_result.count, len(byte_size_corpus)) + count_result = self.client.count(collection_name=self._collection_name) + self.assertEqual(count_result.count, len(byte_size_corpus)) - points, _ = client.scroll( - collection_name=self._collection_name, - limit=100, - with_payload=True, - with_vectors=True, - ) + points, _ = self.client.scroll( + collection_name=self._collection_name, + limit=100, + with_payload=True, + with_vectors=True, + ) points_by_id = {p.id: p for p in points} for item in byte_size_corpus: diff --git a/sdks/python/setup.py b/sdks/python/setup.py index f68223ff4ab9..46c6f4b94904 100644 --- a/sdks/python/setup.py +++ b/sdks/python/setup.py @@ -509,7 +509,7 @@ def get_portability_package_data(): 'scikit-learn>=0.20.0,<1.8.0', 'sqlalchemy>=1.3,<3.0', 'psycopg2-binary>=2.8.5,<3.0', - 'testcontainers[mysql,kafka,milvus]>=4.0.0,<5.0.0', + 'testcontainers[mysql,kafka,milvus,qdrant]>=4.0.0,<5.0.0', 'cryptography>=41.0.2', # TODO(https://github.com/apache/beam/issues/36951): need to # further investigate the cause From 1130a810ddd5bab580818352d75bcea75de4c2e6 Mon Sep 17 00:00:00 2001 From: Michael Gruschke Date: Thu, 21 May 2026 21:17:26 +0200 Subject: [PATCH 15/18] chore: fix import order --- sdks/python/apache_beam/ml/rag/ingestion/qdrant_it_test.py | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/sdks/python/apache_beam/ml/rag/ingestion/qdrant_it_test.py b/sdks/python/apache_beam/ml/rag/ingestion/qdrant_it_test.py index 8b4860b426cc..684458a2c55b 100644 --- a/sdks/python/apache_beam/ml/rag/ingestion/qdrant_it_test.py +++ b/sdks/python/apache_beam/ml/rag/ingestion/qdrant_it_test.py @@ -14,9 +14,10 @@ # See the License for the specific language governing permissions and # limitations under the License. -import pytest import unittest +import pytest + import apache_beam as beam from apache_beam.ml.rag.ingestion.qdrant import QdrantConnectionParameters from apache_beam.ml.rag.ingestion.qdrant import QdrantWriteConfig From 2f78d0765dc1cc6de23ad78dc3d06df77a5eb1d2 Mon Sep 17 00:00:00 2001 From: Michael Gruschke Date: Thu, 21 May 2026 22:28:21 +0200 Subject: [PATCH 16/18] fix: move test client creation into setup --- sdks/python/apache_beam/ml/rag/ingestion/qdrant_it_test.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/sdks/python/apache_beam/ml/rag/ingestion/qdrant_it_test.py b/sdks/python/apache_beam/ml/rag/ingestion/qdrant_it_test.py index 684458a2c55b..c514c0a6fdfc 100644 --- a/sdks/python/apache_beam/ml/rag/ingestion/qdrant_it_test.py +++ b/sdks/python/apache_beam/ml/rag/ingestion/qdrant_it_test.py @@ -74,11 +74,11 @@ def setUpClass(cls): cls._port = int(cls._container.get_exposed_port(6333)) cls._connection_params = QdrantConnectionParameters( host=cls._host, port=cls._port) - cls.client = cls._container.get_client() def setUp(self): self._collection_name = f"test_collection_{self._testMethodName}" + self.client = self._container.get_client() self.client.create_collection( collection_name=self._collection_name, vectors_config={ From c702107e6d0993f62dd5b8150f94540af69db534 Mon Sep 17 00:00:00 2001 From: Michael Gruschke Date: Thu, 21 May 2026 23:14:40 +0200 Subject: [PATCH 17/18] chore: mark qdrant integration tests as require_docker_in_docker --- sdks/python/apache_beam/ml/rag/ingestion/qdrant_it_test.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/sdks/python/apache_beam/ml/rag/ingestion/qdrant_it_test.py b/sdks/python/apache_beam/ml/rag/ingestion/qdrant_it_test.py index c514c0a6fdfc..b82ce2b6e74d 100644 --- a/sdks/python/apache_beam/ml/rag/ingestion/qdrant_it_test.py +++ b/sdks/python/apache_beam/ml/rag/ingestion/qdrant_it_test.py @@ -64,7 +64,7 @@ @unittest.skipIf(not QDRANT_AVAILABLE, "qdrant dependencies not installed.") @unittest.skipIf(not QdrantContainer, "qdrant test_container not installed.") -@pytest.mark.uses_testcontainers +@pytest.mark.require_docker_in_docker class TestQdrantIngestion(unittest.TestCase): @classmethod def setUpClass(cls): From 181296cfe5f8a38956461ff0066240cef7956feb Mon Sep 17 00:00:00 2001 From: Michael Gruschke Date: Fri, 29 May 2026 11:00:31 +0200 Subject: [PATCH 18/18] chore: move changes msg to 2.75 --- CHANGES.md | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/CHANGES.md b/CHANGES.md index c1b6232d8dd5..3b4af42daf8d 100644 --- a/CHANGES.md +++ b/CHANGES.md @@ -61,6 +61,7 @@ * New highly anticipated feature X added to Python SDK ([#X](https://github.com/apache/beam/issues/X)). * New highly anticipated feature Y added to Java SDK ([#Y](https://github.com/apache/beam/issues/Y)). +* (Python) Added [Qdrant](https://qdrant.tech/) VectorDatabaseWriteConfig implementation ([#38141](https://github.com/apache/beam/issues/38141)). ## I/Os @@ -116,7 +117,6 @@ * (Python) Supported Python user type in Beam SQL. For example, SQL statements like `SELECT some_field from PCOLLECTION` can now operate a PCollection of Beam Row containing pickable Python user type ([#20738](https://github.com/apache/beam/issues/20738)). * (Python) Introduced `beam.coders.registry.register_row` as preferred API to register a named tuple or dataclass with a Beam Row. At pipelne runtime, the original type associated with the registered row are preserved across the serialization boundary ([#38108](https://github.com/apache/beam/issues/38108)). * (Python) Added `type_overrides` parameter to `WriteToBigQuery` allowing users to specify custom BigQuery to Python type mappings when using Storage Write API. This enables support for types like DATE, DATETIME, and JSON (Python) ([#25946](https://github.com/apache/beam/issues/25946)). -* (Python) Added [Qdrant](https://qdrant.tech/) VectorDatabaseWriteConfig implementation ([#38141](https://github.com/apache/beam/issues/38141)). ## Breaking Changes @@ -2463,4 +2463,4 @@ Schema Options, it will be removed in version `2.23.0`. ([BEAM-9704](https://iss ## Highlights -- For versions 2.19.0 and older release notes are available on [Apache Beam Blog](https://beam.apache.org/blog/). \ No newline at end of file +- For versions 2.19.0 and older release notes are available on [Apache Beam Blog](https://beam.apache.org/blog/).