From ac37fb7fe73eb6b6983a030fe301bb33668ce67a Mon Sep 17 00:00:00 2001 From: Pablo Pardo Garcia Date: Thu, 11 Sep 2025 14:53:16 +0200 Subject: [PATCH 1/3] add status to Pipeline --- src/glassflow/etl/models/__init__.py | 3 ++- src/glassflow/etl/models/pipeline.py | 11 +++++++++++ src/glassflow/etl/pipeline.py | 8 +++++++- tests/conftest.py | 13 +++++++++++-- tests/test_client.py | 8 ++++++-- tests/test_pipeline.py | 4 ++-- 6 files changed, 39 insertions(+), 8 deletions(-) diff --git a/src/glassflow/etl/models/__init__.py b/src/glassflow/etl/models/__init__.py index 3b5ae08..4f39346 100644 --- a/src/glassflow/etl/models/__init__.py +++ b/src/glassflow/etl/models/__init__.py @@ -8,7 +8,7 @@ JoinSourceConfigPatch, JoinType, ) -from .pipeline import PipelineConfig, PipelineConfigPatch +from .pipeline import PipelineConfig, PipelineConfigPatch, PipelineStatus from .sink import SinkConfig, SinkConfigPatch, SinkType, TableMapping from .source import ( ConsumerGroupOffset, @@ -40,6 +40,7 @@ "JoinType", "PipelineConfig", "PipelineConfigPatch", + "PipelineStatus", "SinkConfig", "SinkType", "TableMapping", diff --git a/src/glassflow/etl/models/pipeline.py b/src/glassflow/etl/models/pipeline.py index 8d79905..ea4bacc 100644 --- a/src/glassflow/etl/models/pipeline.py +++ b/src/glassflow/etl/models/pipeline.py @@ -4,12 +4,23 @@ from pydantic import BaseModel, Field, field_validator, model_validator from ..errors import InvalidDataTypeMappingError +from .base import CaseInsensitiveStrEnum from .data_types import kafka_to_clickhouse_data_type_mappings from .join import JoinConfig, JoinConfigPatch from .sink import SinkConfig, SinkConfigPatch from .source import SourceConfig, SourceConfigPatch +class PipelineStatus(CaseInsensitiveStrEnum): + CREATED = "Created" + RUNNING = "Running" + PAUSING = "Pausing" + PAUSED = "Paused" + RESUMING = "Resuming" + TERMINATING = "Terminating" + TERMINATED = "Terminated" + FAILED = "Failed" + class PipelineConfig(BaseModel): pipeline_id: str name: Optional[str] = Field(default=None) diff --git a/src/glassflow/etl/pipeline.py b/src/glassflow/etl/pipeline.py index ef8406d..616f1b5 100644 --- a/src/glassflow/etl/pipeline.py +++ b/src/glassflow/etl/pipeline.py @@ -52,6 +52,7 @@ def __init__( self.config = None self._dlq = DLQ(pipeline_id=self.pipeline_id, host=host) + self.status: models.PipelineStatus | None = None def get(self) -> Pipeline: """Fetch a pipeline by its ID. @@ -67,6 +68,7 @@ def get(self) -> Pipeline: "GET", f"{self.ENDPOINT}/{self.pipeline_id}", event_name="PipelineGet" ) self.config = models.PipelineConfig.model_validate(response.json()) + self.status = models.PipelineStatus(response.json()["status"]) self._dlq = DLQ(pipeline_id=self.pipeline_id, host=self.host) return self @@ -94,6 +96,7 @@ def create(self) -> Pipeline: ), event_name="PipelineCreated", ) + self.status = models.PipelineStatus.CREATED return self except errors.ForbiddenError as e: @@ -160,6 +163,7 @@ def delete(self, terminate: bool = True) -> None: self.get() endpoint = f"{self.ENDPOINT}/{self.pipeline_id}/terminate" self._request("DELETE", endpoint, event_name="PipelineDeleted") + self.status = models.PipelineStatus.TERMINATING def pause(self) -> Pipeline: """Pauses the pipeline with the given ID. @@ -191,11 +195,13 @@ def health(self) -> dict[str, Any]: Returns: dict: Pipeline health """ - return self._request( + response = self._request( "GET", f"{self.ENDPOINT}/{self.pipeline_id}/health", event_name="PipelineHealth", ).json() + self.status = models.PipelineStatus(response["overall_status"]) + return response def to_dict(self) -> dict[str, Any]: """Convert the pipeline configuration to a dictionary. diff --git a/tests/conftest.py b/tests/conftest.py index ed717fc..94804d7 100644 --- a/tests/conftest.py +++ b/tests/conftest.py @@ -21,6 +21,15 @@ def valid_config() -> dict: return pipeline_configs.get_valid_pipeline_config() + +@pytest.fixture +def get_pipeline_response(valid_config) -> dict: + """Fixture for a valid pipeline configuration with status.""" + config = valid_config + config["status"] = "Running" + return config + + @pytest.fixture def valid_config_without_joins() -> dict: """Fixture for a valid pipeline configuration without joins.""" @@ -88,11 +97,11 @@ def mock_connection_error(): @pytest.fixture -def mock_success_get_pipeline(valid_config): +def mock_success_get_pipeline(get_pipeline_response): """Fixture for a successful GET pipeline response.""" return mock_responses.create_mock_response_factory()( status_code=200, - json_data=valid_config, + json_data=get_pipeline_response, ) diff --git a/tests/test_client.py b/tests/test_client.py index 8de60ad..e3b1035 100644 --- a/tests/test_client.py +++ b/tests/test_client.py @@ -18,12 +18,16 @@ def test_client_init(self): assert client.host == "https://example.com" assert client.http_client.base_url == "https://example.com" - def test_client_get_pipeline_success(self, valid_config, mock_success_response): + def test_client_get_pipeline_success( + self, + get_pipeline_response, + mock_success_response + ): """Test successful pipeline retrieval by ID.""" client = Client() pipeline_id = "test-pipeline-id" - mock_success_response.json.return_value = valid_config + mock_success_response.json.return_value = get_pipeline_response with patch( "httpx.Client.request", return_value=mock_success_response diff --git a/tests/test_pipeline.py b/tests/test_pipeline.py index eb31c38..b8f32c9 100644 --- a/tests/test_pipeline.py +++ b/tests/test_pipeline.py @@ -91,14 +91,14 @@ def test_lifecycle_operations( method, endpoint, params, - valid_config, + get_pipeline_response, ): """Test common pipeline lifecycle operations.""" with patch( "httpx.Client.request", return_value=mock_success_response ) as mock_request: if method == "GET": - mock_request.return_value.json.return_value = valid_config + mock_request.return_value.json.return_value = get_pipeline_response result = getattr(pipeline, operation)(**params) expected_endpoint = f"{pipeline.ENDPOINT}/{pipeline.pipeline_id}{endpoint}" mock_request.assert_called_once_with(method, expected_endpoint) From 3c579d473b026108f5a24d39cfd9b3937891407c Mon Sep 17 00:00:00 2001 From: Pablo Pardo Garcia Date: Thu, 11 Sep 2025 14:53:45 +0200 Subject: [PATCH 2/3] chore: fix format --- src/glassflow/etl/models/pipeline.py | 1 + tests/conftest.py | 1 - tests/test_client.py | 4 +--- 3 files changed, 2 insertions(+), 4 deletions(-) diff --git a/src/glassflow/etl/models/pipeline.py b/src/glassflow/etl/models/pipeline.py index ea4bacc..9555bb8 100644 --- a/src/glassflow/etl/models/pipeline.py +++ b/src/glassflow/etl/models/pipeline.py @@ -21,6 +21,7 @@ class PipelineStatus(CaseInsensitiveStrEnum): TERMINATED = "Terminated" FAILED = "Failed" + class PipelineConfig(BaseModel): pipeline_id: str name: Optional[str] = Field(default=None) diff --git a/tests/conftest.py b/tests/conftest.py index 94804d7..322338f 100644 --- a/tests/conftest.py +++ b/tests/conftest.py @@ -21,7 +21,6 @@ def valid_config() -> dict: return pipeline_configs.get_valid_pipeline_config() - @pytest.fixture def get_pipeline_response(valid_config) -> dict: """Fixture for a valid pipeline configuration with status.""" diff --git a/tests/test_client.py b/tests/test_client.py index e3b1035..fe1694c 100644 --- a/tests/test_client.py +++ b/tests/test_client.py @@ -19,9 +19,7 @@ def test_client_init(self): assert client.http_client.base_url == "https://example.com" def test_client_get_pipeline_success( - self, - get_pipeline_response, - mock_success_response + self, get_pipeline_response, mock_success_response ): """Test successful pipeline retrieval by ID.""" client = Client() From 529a8bd3be0d7e81d97b776136b8564d9533ffca Mon Sep 17 00:00:00 2001 From: "github-actions[bot]" Date: Thu, 11 Sep 2025 12:54:46 +0000 Subject: [PATCH 3/3] chore: bump version to 3.0.2 --- VERSION | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/VERSION b/VERSION index cb2b00e..b502146 100644 --- a/VERSION +++ b/VERSION @@ -1 +1 @@ -3.0.1 +3.0.2