Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions app/core/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,7 @@ class Settings(BaseSettings):
artifact_storage_host: str = Field(default="localhost", alias="ARTIFACT_STORAGE_HOST")
artifact_storage_grpc_port: int = Field(default=50051, alias="ARTIFACT_STORAGE_GRPC_PORT")
artifact_storage_http_port: int = Field(default=8080, alias="ARTIFACT_STORAGE_HTTP_PORT")
artifact_storage_api_key: str = Field(default="dev-api-key", alias="ARTIFACT_STORAGE_API_KEY")

@field_validator("cors_origins", mode="plain")
@classmethod
Expand Down
18 changes: 13 additions & 5 deletions app/core/grpc_clients.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,14 +21,22 @@ async def get_artifact_storage_client() -> ArtifactStorageClient:
"""
global _artifact_storage_client

if _artifact_storage_client is None:
# If client doesn't exist or isn't connected, initialize it
if _artifact_storage_client is None or _artifact_storage_client.stub is None:
settings = get_settings()
_artifact_storage_client = ArtifactStorageClient(
client = ArtifactStorageClient(
host=settings.artifact_storage_host,
port=settings.artifact_storage_grpc_port
port=settings.artifact_storage_grpc_port,
api_key=settings.artifact_storage_api_key
)
await _artifact_storage_client.connect()
logger.info("Artifact Storage gRPC client initialized")
try:
await client.connect()
_artifact_storage_client = client
logger.info("Artifact Storage gRPC client initialized")
except Exception as e:
logger.error(f"Failed to initialize Artifact Storage gRPC client: {e}")
_artifact_storage_client = None
raise

return _artifact_storage_client

Expand Down
19 changes: 13 additions & 6 deletions app/grpc/clients/artifact_storage_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,15 +19,17 @@ class ArtifactStorageClient:
- Listing artifacts
"""

def __init__(self, host: str, port: int):
def __init__(self, host: str, port: int, api_key: str):
"""
Initialize Artifact Storage gRPC client.

Args:
host: Artifact Storage service hostname
port: gRPC port (typically 50051)
api_key: Admin API Key for authentication
"""
self.address = f"{host}:{port}"
self.api_key = api_key
self.channel: Optional[grpc.aio.Channel] = None
self.stub = None

Expand Down Expand Up @@ -90,7 +92,8 @@ async def get_upload_url(
project_id=project_id or ""
)

response = await self.stub.GetUploadURL(request)
metadata = (("x-api-key", self.api_key),)
response = await self.stub.GetUploadURL(request, metadata=metadata)

return {
"artifact_id": response.artifact_id,
Expand Down Expand Up @@ -132,7 +135,8 @@ async def complete_upload(
version_id=version_id
)

response = await self.stub.CompleteUpload(request)
metadata = (("x-api-key", self.api_key),)
response = await self.stub.CompleteUpload(request, metadata=metadata)

return response.status == "completed"
except grpc.RpcError as e:
Expand Down Expand Up @@ -165,7 +169,8 @@ async def get_download_url(
expires_in_seconds=expires_in_seconds
)

response = await self.stub.GetDownloadURL(request)
metadata = (("x-api-key", self.api_key),)
response = await self.stub.GetDownloadURL(request, metadata=metadata)

return response.download_url
except grpc.RpcError as e:
Expand All @@ -190,7 +195,8 @@ async def get_metadata(self, artifact_id: str) -> Dict[str, Any]:

request = artifact_pb2.GetMetadataRequest(artifact_id=artifact_id)

response = await self.stub.GetMetadata(request)
metadata = (("x-api-key", self.api_key),)
response = await self.stub.GetMetadata(request, metadata=metadata)

return {
"id": response.id,
Expand Down Expand Up @@ -237,7 +243,8 @@ async def list_artifacts(
pipeline_id=pipeline_id or ""
)

response = await self.stub.ListArtifacts(request)
metadata = (("x-api-key", self.api_key),)
response = await self.stub.ListArtifacts(request, metadata=metadata)

artifacts = []
for artifact in response.artifacts:
Expand Down
Loading