Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion VERSION
Original file line number Diff line number Diff line change
@@ -1 +1 @@
3.0.1
3.0.2
3 changes: 2 additions & 1 deletion src/glassflow/etl/models/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@
JoinSourceConfigPatch,
JoinType,
)
from .pipeline import PipelineConfig, PipelineConfigPatch
from .pipeline import PipelineConfig, PipelineConfigPatch, PipelineStatus
from .sink import SinkConfig, SinkConfigPatch, SinkType, TableMapping
from .source import (
ConsumerGroupOffset,
Expand Down Expand Up @@ -40,6 +40,7 @@
"JoinType",
"PipelineConfig",
"PipelineConfigPatch",
"PipelineStatus",
"SinkConfig",
"SinkType",
"TableMapping",
Expand Down
12 changes: 12 additions & 0 deletions src/glassflow/etl/models/pipeline.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,12 +4,24 @@
from pydantic import BaseModel, Field, field_validator, model_validator

from ..errors import InvalidDataTypeMappingError
from .base import CaseInsensitiveStrEnum
from .data_types import kafka_to_clickhouse_data_type_mappings
from .join import JoinConfig, JoinConfigPatch
from .sink import SinkConfig, SinkConfigPatch
from .source import SourceConfig, SourceConfigPatch


class PipelineStatus(CaseInsensitiveStrEnum):
CREATED = "Created"
RUNNING = "Running"
PAUSING = "Pausing"
PAUSED = "Paused"
RESUMING = "Resuming"
TERMINATING = "Terminating"
TERMINATED = "Terminated"
FAILED = "Failed"


class PipelineConfig(BaseModel):
pipeline_id: str
name: Optional[str] = Field(default=None)
Expand Down
8 changes: 7 additions & 1 deletion src/glassflow/etl/pipeline.py
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,7 @@ def __init__(
self.config = None

self._dlq = DLQ(pipeline_id=self.pipeline_id, host=host)
self.status: models.PipelineStatus | None = None

def get(self) -> Pipeline:
"""Fetch a pipeline by its ID.
Expand All @@ -67,6 +68,7 @@ def get(self) -> Pipeline:
"GET", f"{self.ENDPOINT}/{self.pipeline_id}", event_name="PipelineGet"
)
self.config = models.PipelineConfig.model_validate(response.json())
self.status = models.PipelineStatus(response.json()["status"])
self._dlq = DLQ(pipeline_id=self.pipeline_id, host=self.host)
return self

Expand Down Expand Up @@ -94,6 +96,7 @@ def create(self) -> Pipeline:
),
event_name="PipelineCreated",
)
self.status = models.PipelineStatus.CREATED
return self

except errors.ForbiddenError as e:
Expand Down Expand Up @@ -160,6 +163,7 @@ def delete(self, terminate: bool = True) -> None:
self.get()
endpoint = f"{self.ENDPOINT}/{self.pipeline_id}/terminate"
self._request("DELETE", endpoint, event_name="PipelineDeleted")
self.status = models.PipelineStatus.TERMINATING

def pause(self) -> Pipeline:
"""Pauses the pipeline with the given ID.
Expand Down Expand Up @@ -191,11 +195,13 @@ def health(self) -> dict[str, Any]:
Returns:
dict: Pipeline health
"""
return self._request(
response = self._request(
"GET",
f"{self.ENDPOINT}/{self.pipeline_id}/health",
event_name="PipelineHealth",
).json()
self.status = models.PipelineStatus(response["overall_status"])
return response

def to_dict(self) -> dict[str, Any]:
"""Convert the pipeline configuration to a dictionary.
Expand Down
12 changes: 10 additions & 2 deletions tests/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,14 @@ def valid_config() -> dict:
return pipeline_configs.get_valid_pipeline_config()


@pytest.fixture
def get_pipeline_response(valid_config) -> dict:
"""Fixture for a valid pipeline configuration with status."""
config = valid_config
config["status"] = "Running"
return config


@pytest.fixture
def valid_config_without_joins() -> dict:
"""Fixture for a valid pipeline configuration without joins."""
Expand Down Expand Up @@ -88,11 +96,11 @@ def mock_connection_error():


@pytest.fixture
def mock_success_get_pipeline(valid_config):
def mock_success_get_pipeline(get_pipeline_response):
"""Fixture for a successful GET pipeline response."""
return mock_responses.create_mock_response_factory()(
status_code=200,
json_data=valid_config,
json_data=get_pipeline_response,
)


Expand Down
6 changes: 4 additions & 2 deletions tests/test_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,12 +18,14 @@ def test_client_init(self):
assert client.host == "https://example.com"
assert client.http_client.base_url == "https://example.com"

def test_client_get_pipeline_success(self, valid_config, mock_success_response):
def test_client_get_pipeline_success(
self, get_pipeline_response, mock_success_response
):
"""Test successful pipeline retrieval by ID."""
client = Client()
pipeline_id = "test-pipeline-id"

mock_success_response.json.return_value = valid_config
mock_success_response.json.return_value = get_pipeline_response

with patch(
"httpx.Client.request", return_value=mock_success_response
Expand Down
4 changes: 2 additions & 2 deletions tests/test_pipeline.py
Original file line number Diff line number Diff line change
Expand Up @@ -91,14 +91,14 @@ def test_lifecycle_operations(
method,
endpoint,
params,
valid_config,
get_pipeline_response,
):
"""Test common pipeline lifecycle operations."""
with patch(
"httpx.Client.request", return_value=mock_success_response
) as mock_request:
if method == "GET":
mock_request.return_value.json.return_value = valid_config
mock_request.return_value.json.return_value = get_pipeline_response
result = getattr(pipeline, operation)(**params)
expected_endpoint = f"{pipeline.ENDPOINT}/{pipeline.pipeline_id}{endpoint}"
mock_request.assert_called_once_with(method, expected_endpoint)
Expand Down