diff --git a/libs/labelbox/src/labelbox/__init__.py b/libs/labelbox/src/labelbox/__init__.py index 0979d1590..8eb450219 100644 --- a/libs/labelbox/src/labelbox/__init__.py +++ b/libs/labelbox/src/labelbox/__init__.py @@ -22,6 +22,13 @@ ) from labelbox.schema.dataset import Dataset from labelbox.schema.enums import AnnotationImportState +from labelbox.schema.external_metrics import ( + AutoQaStatus, + ExternalMetricsEntry, + ExternalMetricsResult, + Performance, + SubmittedBy, +) from labelbox.schema.export_task import ( BufferedJsonConverterOutput, ExportTask, diff --git a/libs/labelbox/src/labelbox/schema/external_metrics.py b/libs/labelbox/src/labelbox/schema/external_metrics.py new file mode 100644 index 000000000..5b8b16660 --- /dev/null +++ b/libs/labelbox/src/labelbox/schema/external_metrics.py @@ -0,0 +1,65 @@ +from enum import Enum +from typing import Any, Dict, List, Optional + +from pydantic import BaseModel + + +class AutoQaStatus(str, Enum): + Approve = "Approve" + Reject = "Reject" + Neutral = "Neutral" + + +class SubmittedBy(BaseModel): + email: str + discord_id: Optional[str] = None + + +class Performance(BaseModel): + auto_qa_status: AutoQaStatus + auto_qa_score: Optional[float] = None + auto_qa_feedback: Optional[str] = None + time_to_completion: Optional[float] = None + + +class ExternalMetricsEntry(BaseModel): + task_id: str + submitted_by: SubmittedBy + performance: Performance + metadata: Optional[Dict[str, Any]] = None + submitted_on: Optional[str] = None + + +class ExternalMetricsResult(BaseModel): + task_id: str + data_row_id: str + label_id: str + + +def _to_gql_input(entry: ExternalMetricsEntry) -> Dict[str, Any]: + """Convert an ExternalMetricsEntry to a camelCase dict matching the GQL schema.""" + submitted_by: Dict[str, Any] = {"email": entry.submitted_by.email} + if entry.submitted_by.discord_id is not None: + submitted_by["discordId"] = entry.submitted_by.discord_id + + performance: Dict[str, Any] = { + "autoQaStatus": entry.performance.auto_qa_status.value, + } + if entry.performance.auto_qa_score is not None: + performance["autoQaScore"] = entry.performance.auto_qa_score + if entry.performance.auto_qa_feedback is not None: + performance["autoQaFeedback"] = entry.performance.auto_qa_feedback + if entry.performance.time_to_completion is not None: + performance["timeToCompletion"] = entry.performance.time_to_completion + + result: Dict[str, Any] = { + "taskId": entry.task_id, + "submittedBy": submitted_by, + "performance": performance, + } + if entry.metadata is not None: + result["metadata"] = entry.metadata + if entry.submitted_on is not None: + result["submittedOn"] = entry.submitted_on + + return result diff --git a/libs/labelbox/src/labelbox/schema/project.py b/libs/labelbox/src/labelbox/schema/project.py index 206d3c43d..c455001ea 100644 --- a/libs/labelbox/src/labelbox/schema/project.py +++ b/libs/labelbox/src/labelbox/schema/project.py @@ -37,6 +37,11 @@ ProjectExportFilters, build_filters, ) +from labelbox.schema.external_metrics import ( + ExternalMetricsEntry, + ExternalMetricsResult, + _to_gql_input, +) from labelbox.schema.export_params import ProjectExportParams from labelbox.schema.export_task import ExportTask from labelbox.schema.identifiable import DataRowIdentifier @@ -1001,6 +1006,47 @@ def create_batches( return CreateBatchesTask(self.client, self.uid, batch_ids, task_ids) + def submit_external_metrics( + self, + entries: List[ExternalMetricsEntry], + ) -> List[ExternalMetricsResult]: + """Submits external QA metrics for tasks in this project. + + Args: + entries: A list of ExternalMetricsEntry objects containing task metrics. + + Returns: + A list of ExternalMetricsResult objects with task, data row, and label IDs. + """ + mutation_str = """mutation submitExternalMetricsPyApi($input: SubmitExternalMetricsInput!) { + submitExternalMetrics(input: $input) { + results { + taskId + dataRowId + labelId + } + } + }""" + + params = { + "input": { + "projectId": self.uid, + "entries": [_to_gql_input(e) for e in entries], + } + } + + response = self.client.execute(mutation_str, params) + results = response["submitExternalMetrics"]["results"] + + return [ + ExternalMetricsResult( + task_id=r["taskId"], + data_row_id=r["dataRowId"], + label_id=r["labelId"], + ) + for r in results + ] + def create_batches_from_dataset( self, name_prefix: str,