diff --git a/app/core/config.py b/app/core/config.py index a122364..5e6fd09 100644 --- a/app/core/config.py +++ b/app/core/config.py @@ -73,6 +73,7 @@ class Settings(BaseSettings): artifact_storage_host: str = Field(default="localhost", alias="ARTIFACT_STORAGE_HOST") artifact_storage_grpc_port: int = Field(default=50051, alias="ARTIFACT_STORAGE_GRPC_PORT") artifact_storage_http_port: int = Field(default=8080, alias="ARTIFACT_STORAGE_HTTP_PORT") + artifact_storage_api_key: str = Field(default="dev-api-key", alias="ARTIFACT_STORAGE_API_KEY") @field_validator("cors_origins", mode="plain") @classmethod diff --git a/app/core/grpc_clients.py b/app/core/grpc_clients.py index 4ce907d..f6031ce 100644 --- a/app/core/grpc_clients.py +++ b/app/core/grpc_clients.py @@ -21,14 +21,22 @@ async def get_artifact_storage_client() -> ArtifactStorageClient: """ global _artifact_storage_client - if _artifact_storage_client is None: + # If client doesn't exist or isn't connected, initialize it + if _artifact_storage_client is None or _artifact_storage_client.stub is None: settings = get_settings() - _artifact_storage_client = ArtifactStorageClient( + client = ArtifactStorageClient( host=settings.artifact_storage_host, - port=settings.artifact_storage_grpc_port + port=settings.artifact_storage_grpc_port, + api_key=settings.artifact_storage_api_key ) - await _artifact_storage_client.connect() - logger.info("Artifact Storage gRPC client initialized") + try: + await client.connect() + _artifact_storage_client = client + logger.info("Artifact Storage gRPC client initialized") + except Exception as e: + logger.error(f"Failed to initialize Artifact Storage gRPC client: {e}") + _artifact_storage_client = None + raise return _artifact_storage_client diff --git a/app/grpc/clients/artifact_storage_client.py b/app/grpc/clients/artifact_storage_client.py index 0a83e41..679fb93 100644 --- a/app/grpc/clients/artifact_storage_client.py +++ b/app/grpc/clients/artifact_storage_client.py @@ -19,15 +19,17 @@ class ArtifactStorageClient: - Listing artifacts """ - def __init__(self, host: str, port: int): + def __init__(self, host: str, port: int, api_key: str): """ Initialize Artifact Storage gRPC client. Args: host: Artifact Storage service hostname port: gRPC port (typically 50051) + api_key: Admin API Key for authentication """ self.address = f"{host}:{port}" + self.api_key = api_key self.channel: Optional[grpc.aio.Channel] = None self.stub = None @@ -90,7 +92,8 @@ async def get_upload_url( project_id=project_id or "" ) - response = await self.stub.GetUploadURL(request) + metadata = (("x-api-key", self.api_key),) + response = await self.stub.GetUploadURL(request, metadata=metadata) return { "artifact_id": response.artifact_id, @@ -132,7 +135,8 @@ async def complete_upload( version_id=version_id ) - response = await self.stub.CompleteUpload(request) + metadata = (("x-api-key", self.api_key),) + response = await self.stub.CompleteUpload(request, metadata=metadata) return response.status == "completed" except grpc.RpcError as e: @@ -165,7 +169,8 @@ async def get_download_url( expires_in_seconds=expires_in_seconds ) - response = await self.stub.GetDownloadURL(request) + metadata = (("x-api-key", self.api_key),) + response = await self.stub.GetDownloadURL(request, metadata=metadata) return response.download_url except grpc.RpcError as e: @@ -190,7 +195,8 @@ async def get_metadata(self, artifact_id: str) -> Dict[str, Any]: request = artifact_pb2.GetMetadataRequest(artifact_id=artifact_id) - response = await self.stub.GetMetadata(request) + metadata = (("x-api-key", self.api_key),) + response = await self.stub.GetMetadata(request, metadata=metadata) return { "id": response.id, @@ -237,7 +243,8 @@ async def list_artifacts( pipeline_id=pipeline_id or "" ) - response = await self.stub.ListArtifacts(request) + metadata = (("x-api-key", self.api_key),) + response = await self.stub.ListArtifacts(request, metadata=metadata) artifacts = [] for artifact in response.artifacts: