Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
163 changes: 162 additions & 1 deletion app/api/v1/endpoints/artifacts.py
Original file line number Diff line number Diff line change
Expand Up @@ -173,17 +173,28 @@ async def get_artifact_metadata(
async def list_artifacts(
project_id: Optional[str] = Query(None, description="Filter by project ID"),
pipeline_id: Optional[str] = Query(None, description="Filter by pipeline ID"),
include_versions: bool = Query(False, description="Include version information"),
current_user: User = Depends(deps.get_current_user),
client: ArtifactStorageClient = Depends(get_artifact_storage_client)
):
"""
List artifacts with optional filters.
List artifacts with optional filters and version information.
"""
try:
artifacts = await client.list_artifacts(
project_id=project_id,
pipeline_id=pipeline_id
)

# If version information requested, fetch versions for each artifact
if include_versions:
for artifact in artifacts:
try:
artifact["versions"] = await client.get_versions(artifact["id"])
except Exception:
# If we can't get versions, don't fail the entire request
artifact["versions"] = []

return artifacts
except Exception as e:
raise HTTPException(
Expand Down Expand Up @@ -367,3 +378,153 @@ async def complete_multipart_upload(
status_code=500,
detail=f"Failed to complete multipart upload: {str(e)}"
)


# Version Management Request/Response Models
class ArtifactVersion(BaseModel):
"""Artifact version information."""
version_id: Optional[str] = Field(None, description="S3 version ID")
size: int = Field(..., description="Version size in bytes")
checksum_sha256: Optional[str] = Field(None, description="SHA256 checksum")
created_at: Optional[str] = Field(None, description="Creation timestamp")
is_latest: bool = Field(False, description="Whether this is the latest version")


class RestoreArtifactRequest(BaseModel):
"""Request to restore an artifact."""
version_id: Optional[str] = Field(None, description="Target version ID (optional)")


class RestoreArtifactResponse(BaseModel):
"""Response with restoration status."""
status: str = Field(..., description="Restoration status")
restored_version_id: Optional[str] = Field(None, description="Restored version ID")
restoration_time: Optional[str] = Field(None, description="Restoration timestamp")


# Version Management Endpoints
@router.get("/{artifact_id}/versions", response_model=list[ArtifactVersion], status_code=status.HTTP_200_OK)
async def get_artifact_versions(
artifact_id: str,
current_user: User = Depends(deps.get_current_user),
client: ArtifactStorageClient = Depends(get_artifact_storage_client)
):
"""
Get version history for an artifact.
"""
try:
versions = await client.get_versions(artifact_id)
return versions
except Exception as e:
raise HTTPException(
status_code=500,
detail=f"Failed to get artifact versions: {str(e)}"
)


@router.post("/{artifact_id}/restore", response_model=RestoreArtifactResponse, status_code=status.HTTP_200_OK)
async def restore_artifact(
artifact_id: str,
request: RestoreArtifactRequest,
current_user: User = Depends(deps.get_current_user),
client: ArtifactStorageClient = Depends(get_artifact_storage_client)
):
"""
Restore an artifact to a specific version.
"""
try:
result = await client.restore_artifact(
artifact_id=artifact_id,
version_id=request.version_id
)
return result
except Exception as e:
raise HTTPException(
status_code=500,
detail=f"Failed to restore artifact: {str(e)}"
)


# Retention Policy Request/Response Models
class SetRetentionPolicyRequest(BaseModel):
"""Request to set project retention policy."""
project_id: str = Field(..., description="Project ID")
max_bytes: int = Field(..., description="Maximum storage bytes", ge=0)
ttl_days: int = Field(..., description="Default TTL in days", ge=1)


class RetentionPolicyResponse(BaseModel):
"""Response with retention policy details."""
project_id: str = Field(..., description="Project ID")
max_bytes: Optional[int] = Field(None, description="Maximum storage bytes")
ttl_days: Optional[int] = Field(None, description="Default TTL in days")
current_usage: Optional[int] = Field(None, description="Current usage in bytes")


class StorageClassInfo(BaseModel):
"""Storage class information."""
name: str = Field(..., description="Storage class name")
description: str = Field(..., description="Storage class description")
cost_per_gb: Optional[float] = Field(None, description="Cost per GB")


# Retention Policy Endpoints
@router.put("/projects/{project_id}/retention", response_model=RetentionPolicyResponse, status_code=status.HTTP_200_OK)
async def set_retention_policy(
project_id: str,
request: SetRetentionPolicyRequest,
current_user: User = Depends(deps.get_current_user),
client: ArtifactStorageClient = Depends(get_artifact_storage_client)
):
"""
Set retention policy for a project.
"""
try:
result = await client.set_retention_policy(
project_id=project_id,
max_bytes=request.max_bytes,
ttl_days=request.ttl_days
)
return result
except Exception as e:
raise HTTPException(
status_code=500,
detail=f"Failed to set retention policy: {str(e)}"
)


@router.get("/projects/{project_id}/retention", response_model=RetentionPolicyResponse, status_code=status.HTTP_200_OK)
async def get_retention_policy(
project_id: str,
current_user: User = Depends(deps.get_current_user),
client: ArtifactStorageClient = Depends(get_artifact_storage_client)
):
"""
Get retention policy for a project.
"""
try:
result = await client.get_retention_policy(project_id)
return result
except Exception as e:
raise HTTPException(
status_code=500,
detail=f"Failed to get retention policy: {str(e)}"
)


@router.get("/storage-classes", response_model=list[StorageClassInfo], status_code=status.HTTP_200_OK)
async def get_storage_classes(
current_user: User = Depends(deps.get_current_user),
client: ArtifactStorageClient = Depends(get_artifact_storage_client)
):
"""
Get available storage classes.
"""
try:
result = await client.get_storage_classes()
return result
except Exception as e:
raise HTTPException(
status_code=500,
detail=f"Failed to get storage classes: {str(e)}"
)
163 changes: 163 additions & 0 deletions app/grpc/clients/artifact_storage_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -453,6 +453,169 @@ async def complete_multipart_upload(self, artifact_id: str, upload_id: str, part
logger.error(f"gRPC error completing multipart upload: {e.code()} - {e.details()}")
raise

async def get_versions(self, artifact_id: str) -> list:
"""
Get version history for an artifact.

Args:
artifact_id: Artifact ID to get versions for

Returns:
List of artifact versions with metadata
"""
if not self.stub:
raise RuntimeError("Client not connected. Call connect() first.")

try:
from app.grpc.generated import artifact_pb2

request = artifact_pb2.GetVersionsRequest(artifact_id=artifact_id)

metadata = (("x-api-key", self.api_key),)
response = await self.stub.GetVersions(request, metadata=metadata)

versions = []
for version in response.versions:
versions.append({
"version_id": version.version_id if version.HasField("version_id") else None,
"size": version.size,
"checksum_sha256": version.checksum_sha256 if version.HasField("checksum_sha256") else None,
"created_at": version.created_at.ToDatetime().isoformat() if version.HasField("created_at") else None,
"is_latest": version.is_latest if version.HasField("is_latest") else False
})

return versions
except grpc.RpcError as e:
logger.error(f"gRPC error getting artifact versions: {e.code()} - {e.details()}")
raise

async def restore_artifact(self, artifact_id: str, version_id: str = None) -> Dict[str, Any]:
"""
Restore an artifact to a specific version.

Args:
artifact_id: Artifact ID to restore
version_id: Target version ID (optional)

Returns:
Dict with restoration status and details
"""
if not self.stub:
raise RuntimeError("Client not connected. Call connect() first.")

try:
from app.grpc.generated import artifact_pb2

request = artifact_pb2.RestoreArtifactRequest(
artifact_id=artifact_id,
version_id=version_id
)

metadata = (("x-api-key", self.api_key),)
response = await self.stub.RestoreArtifact(request, metadata=metadata)

return {
"status": response.status,
"restored_version_id": response.restored_version_id if response.HasField("restored_version_id") else None,
"restoration_time": response.restoration_time.ToDatetime().isoformat() if response.HasField("restoration_time") else None
}
except grpc.RpcError as e:
logger.error(f"gRPC error restoring artifact: {e.code()} - {e.details()}")
raise

async def set_retention_policy(self, project_id: str, max_bytes: int, ttl_days: int) -> Dict[str, Any]:
"""
Set retention policy for a project.

Args:
project_id: Project ID to set policy for
max_bytes: Maximum storage bytes allowed
ttl_days: Default TTL for artifacts in days

Returns:
Dict with policy status and details
"""
if not self.stub:
raise RuntimeError("Client not connected. Call connect() first.")

try:
from app.grpc.generated import artifact_pb2

request = artifact_pb2.SetRetentionPolicyRequest(
project_id=project_id,
max_bytes=max_bytes,
ttl_days=ttl_days
)

metadata = (("x-api-key", self.api_key),)
response = await self.stub.SetRetentionPolicy(request, metadata=metadata)

return {
"status": response.status,
"project_id": response.project_id,
"max_bytes": response.max_bytes if response.HasField("max_bytes") else None,
"ttl_days": response.ttl_days if response.HasField("ttl_days") else None
}
except grpc.RpcError as e:
logger.error(f"gRPC error setting retention policy: {e.code()} - {e.details()}")
raise

async def get_retention_policy(self, project_id: str) -> Dict[str, Any]:
"""
Get retention policy for a project.

Args:
project_id: Project ID to get policy for

Returns:
Dict with policy details
"""
if not self.stub:
raise RuntimeError("Client not connected. Call connect() first.")

try:
from app.grpc.generated import artifact_pb2

request = artifact_pb2.GetRetentionPolicyRequest(project_id=project_id)

metadata = (("x-api-key", self.api_key),)
response = await self.stub.GetRetentionPolicy(request, metadata=metadata)

return {
"project_id": response.project_id,
"max_bytes": response.max_bytes if response.HasField("max_bytes") else None,
"ttl_days": response.ttl_days if response.HasField("ttl_days") else None,
"current_usage": response.current_usage if response.HasField("current_usage") else None
}
except grpc.RpcError as e:
logger.error(f"gRPC error getting retention policy: {e.code()} - {e.details()}")
raise

async def get_storage_classes(self) -> Dict[str, Any]:
"""
Get available storage classes.

Returns:
Dict with available storage classes
"""
if not self.stub:
raise RuntimeError("Client not connected. Call connect() first.")

try:
from app.grpc.generated import artifact_pb2

request = artifact_pb2.GetStorageClassesRequest()

metadata = (("x-api-key", self.api_key),)
response = await self.stub.GetStorageClasses(request, metadata=metadata)

return {
"storage_classes": list(response.storage_classes)
}
except grpc.RpcError as e:
logger.error(f"gRPC error getting storage classes: {e.code()} - {e.details()}")
raise

async def close(self):
"""Close gRPC connection."""
if self.channel:
Expand Down
Loading
Loading