From fba64e4103e3b589f87cb969dedfc36dccd5d06b Mon Sep 17 00:00:00 2001 From: Pablo Pardo Garcia Date: Thu, 11 Sep 2025 15:39:43 +0200 Subject: [PATCH] add pipeline pause and resume functionalities --- README.md | 14 ++++++++++++++ src/glassflow/etl/pipeline.py | 10 ++++++++-- tests/test_pipeline.py | 26 +++++++++++++++++++------- 3 files changed, 41 insertions(+), 9 deletions(-) diff --git a/README.md b/README.md index 5580c06..bc02ab7 100644 --- a/README.md +++ b/README.md @@ -170,6 +170,20 @@ for pipeline in pipelines: print(f"State: {pipeline['state']}") ``` +### Pause / Resume Pipeline + +```python +pipeline = client.get_pipeline("my-pipeline-id") +pipeline.pause() +print(pipeline.status) +``` + +```python +pipeline = client.get_pipeline("my-pipeline-id") +pipeline.resume() +print(pipeline.status) +``` + ### Delete pipeline ```python diff --git a/src/glassflow/etl/pipeline.py b/src/glassflow/etl/pipeline.py index 616f1b5..773425d 100644 --- a/src/glassflow/etl/pipeline.py +++ b/src/glassflow/etl/pipeline.py @@ -175,7 +175,10 @@ def pause(self) -> Pipeline: PipelineNotFoundError: If pipeline is not found APIError: If the API request fails """ - raise NotImplementedError("Pausing is not implemented") + endpoint = f"{self.ENDPOINT}/{self.pipeline_id}/pause" + self._request("POST", endpoint, event_name="PipelinePaused") + self.status = models.PipelineStatus.PAUSING + return self def resume(self) -> Pipeline: """Resumes the pipeline with the given ID. @@ -187,7 +190,10 @@ def resume(self) -> Pipeline: PipelineNotFoundError: If pipeline is not found APIError: If the API request fails """ - raise NotImplementedError("Resuming is not implemented") + endpoint = f"{self.ENDPOINT}/{self.pipeline_id}/resume" + self._request("POST", endpoint, event_name="PipelineResumed") + self.status = models.PipelineStatus.RESUMING + return self def health(self) -> dict[str, Any]: """Get the health of the pipeline. diff --git a/tests/test_pipeline.py b/tests/test_pipeline.py index b8f32c9..f201a77 100644 --- a/tests/test_pipeline.py +++ b/tests/test_pipeline.py @@ -5,7 +5,7 @@ import pytest from pydantic import ValidationError -from glassflow.etl import errors +from glassflow.etl import errors, models from glassflow.etl.models import PipelineConfig from glassflow.etl.pipeline import Pipeline from tests.data import error_scenarios, mock_responses @@ -26,6 +26,7 @@ def test_create_success(self, pipeline, mock_success_response): json=pipeline.config.model_dump(mode="json", by_alias=True), ) assert result == pipeline + assert pipeline.status == models.PipelineStatus.CREATED def test_create_invalid_config(self, invalid_config): """Test pipeline creation with invalid configuration.""" @@ -77,10 +78,18 @@ class TestPipelineLifecycle: """Tests for pause, resume, delete operations.""" @pytest.mark.parametrize( - "operation,method,endpoint,params", + "operation,method,endpoint,params,status", [ - ("get", "GET", "", {}), - ("delete", "DELETE", "/terminate", {"terminate": True}), + ("get", "GET", "", {}, models.PipelineStatus.RUNNING), + ("pause", "POST", "/pause", {}, models.PipelineStatus.PAUSING), + ("resume", "POST", "/resume", {}, models.PipelineStatus.RESUMING), + ( + "delete", + "DELETE", + "/terminate", + {"terminate": True}, + models.PipelineStatus.TERMINATING, + ), ], ) def test_lifecycle_operations( @@ -92,6 +101,7 @@ def test_lifecycle_operations( endpoint, params, get_pipeline_response, + status, ): """Test common pipeline lifecycle operations.""" with patch( @@ -106,15 +116,16 @@ def test_lifecycle_operations( assert result is None else: assert result == pipeline + assert pipeline.status == status - @pytest.mark.parametrize("operation", ["get", "delete"]) + @pytest.mark.parametrize("operation", ["get", "delete", "pause", "resume"]) def test_lifecycle_not_found(self, pipeline, mock_not_found_response, operation): """Test lifecycle operations when pipeline is not found.""" with patch("httpx.Client.request", return_value=mock_not_found_response): with pytest.raises(errors.PipelineNotFoundError): getattr(pipeline, operation)() - @pytest.mark.parametrize("operation", ["get", "delete"]) + @pytest.mark.parametrize("operation", ["get", "delete", "pause", "resume"]) def test_lifecycle_connection_error( self, pipeline, mock_connection_error, operation ): @@ -313,7 +324,7 @@ def test_health_success(self, pipeline, mock_success_response): expected = { "pipeline_id": "test-pipeline", "pipeline_name": "Test Pipeline", - "overall_status": "Running", + "overall_status": "Terminating", "created_at": "2025-08-31T16:05:09.163872763Z", "updated_at": "2025-08-31T16:05:10.638243216Z", } @@ -328,3 +339,4 @@ def test_health_success(self, pipeline, mock_success_response): f"{pipeline.ENDPOINT}/{pipeline.pipeline_id}/health", ) assert result == expected + assert pipeline.status == models.PipelineStatus.TERMINATING