Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 7 additions & 0 deletions libs/labelbox/src/labelbox/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,13 @@
)
from labelbox.schema.dataset import Dataset
from labelbox.schema.enums import AnnotationImportState
from labelbox.schema.external_metrics import (
AutoQaStatus,
ExternalMetricsEntry,
ExternalMetricsResult,
Performance,
SubmittedBy,
)
from labelbox.schema.export_task import (
BufferedJsonConverterOutput,
ExportTask,
Expand Down
65 changes: 65 additions & 0 deletions libs/labelbox/src/labelbox/schema/external_metrics.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,65 @@
from enum import Enum
from typing import Any, Dict, List, Optional

from pydantic import BaseModel


class AutoQaStatus(str, Enum):
Approve = "Approve"
Reject = "Reject"
Neutral = "Neutral"


class SubmittedBy(BaseModel):
email: str
discord_id: Optional[str] = None


class Performance(BaseModel):
auto_qa_status: AutoQaStatus
auto_qa_score: Optional[float] = None
auto_qa_feedback: Optional[str] = None
time_to_completion: Optional[float] = None


class ExternalMetricsEntry(BaseModel):
task_id: str
submitted_by: SubmittedBy
performance: Performance
metadata: Optional[Dict[str, Any]] = None
submitted_on: Optional[str] = None


class ExternalMetricsResult(BaseModel):
task_id: str
data_row_id: str
label_id: str


def _to_gql_input(entry: ExternalMetricsEntry) -> Dict[str, Any]:
"""Convert an ExternalMetricsEntry to a camelCase dict matching the GQL schema."""
submitted_by: Dict[str, Any] = {"email": entry.submitted_by.email}
if entry.submitted_by.discord_id is not None:
submitted_by["discordId"] = entry.submitted_by.discord_id

performance: Dict[str, Any] = {
"autoQaStatus": entry.performance.auto_qa_status.value,
}
if entry.performance.auto_qa_score is not None:
performance["autoQaScore"] = entry.performance.auto_qa_score
if entry.performance.auto_qa_feedback is not None:
performance["autoQaFeedback"] = entry.performance.auto_qa_feedback
if entry.performance.time_to_completion is not None:
performance["timeToCompletion"] = entry.performance.time_to_completion

result: Dict[str, Any] = {
"taskId": entry.task_id,
"submittedBy": submitted_by,
"performance": performance,
}
if entry.metadata is not None:
result["metadata"] = entry.metadata
if entry.submitted_on is not None:
result["submittedOn"] = entry.submitted_on

return result
46 changes: 46 additions & 0 deletions libs/labelbox/src/labelbox/schema/project.py
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,11 @@
ProjectExportFilters,
build_filters,
)
from labelbox.schema.external_metrics import (
ExternalMetricsEntry,
ExternalMetricsResult,
_to_gql_input,
)
from labelbox.schema.export_params import ProjectExportParams
from labelbox.schema.export_task import ExportTask
from labelbox.schema.identifiable import DataRowIdentifier
Expand Down Expand Up @@ -1001,6 +1006,47 @@ def create_batches(

return CreateBatchesTask(self.client, self.uid, batch_ids, task_ids)

def submit_external_metrics(
self,
entries: List[ExternalMetricsEntry],
) -> List[ExternalMetricsResult]:
"""Submits external QA metrics for tasks in this project.

Args:
entries: A list of ExternalMetricsEntry objects containing task metrics.

Returns:
A list of ExternalMetricsResult objects with task, data row, and label IDs.
"""
mutation_str = """mutation submitExternalMetricsPyApi($input: SubmitExternalMetricsInput!) {
submitExternalMetrics(input: $input) {
results {
taskId
dataRowId
labelId
}
}
}"""

params = {
"input": {
"projectId": self.uid,
"entries": [_to_gql_input(e) for e in entries],
}
}

response = self.client.execute(mutation_str, params)
results = response["submitExternalMetrics"]["results"]

return [
ExternalMetricsResult(
task_id=r["taskId"],
data_row_id=r["dataRowId"],
label_id=r["labelId"],
)
for r in results
]

def create_batches_from_dataset(
self,
name_prefix: str,
Expand Down
Loading