diff --git a/docs/reference/experimental/async/curator.md b/docs/reference/experimental/async/curator.md index 712027e3a..2f0fd6f7e 100644 --- a/docs/reference/experimental/async/curator.md +++ b/docs/reference/experimental/async/curator.md @@ -56,6 +56,7 @@ at your own risk. members: - create_async - export_to_record_set_async + - synchronize_async - download_csv_async - import_csv_async --- diff --git a/docs/reference/experimental/sync/curator.md b/docs/reference/experimental/sync/curator.md index bed7b1cc4..1df3aae24 100644 --- a/docs/reference/experimental/sync/curator.md +++ b/docs/reference/experimental/sync/curator.md @@ -56,6 +56,7 @@ at your own risk. members: - create - export_to_record_set + - synchronize - download_csv - import_csv --- diff --git a/synapseclient/core/constants/concrete_types.py b/synapseclient/core/constants/concrete_types.py index b2a635f39..d01cc38a9 100644 --- a/synapseclient/core/constants/concrete_types.py +++ b/synapseclient/core/constants/concrete_types.py @@ -152,6 +152,7 @@ LIST_GRID_SESSIONS_RESPONSE = ( "org.sagebionetworks.repo.model.grid.ListGridSessionsResponse" ) +SYNCHRONIZE_GRID_REQUEST = "org.sagebionetworks.repo.model.grid.SynchronizeGridRequest" GRID_CSV_IMPORT_REQUEST = "org.sagebionetworks.repo.model.grid.GridCsvImportRequest" UPLOAD_TO_TABLE_PREVIEW_REQUEST = ( "org.sagebionetworks.repo.model.table.UploadToTablePreviewRequest" diff --git a/synapseclient/models/curation.py b/synapseclient/models/curation.py index 032952a81..6e0aedfcf 100644 --- a/synapseclient/models/curation.py +++ b/synapseclient/models/curation.py @@ -40,6 +40,7 @@ LIST_GRID_SESSIONS_REQUEST, LIST_GRID_SESSIONS_RESPONSE, RECORD_BASED_METADATA_TASK_PROPERTIES, + SYNCHRONIZE_GRID_REQUEST, UPLOAD_TO_TABLE_PREVIEW_REQUEST, ) from synapseclient.core.download.download_functions import download_from_url @@ -1311,6 +1312,53 @@ def to_synapse_request(self) -> Dict[str, Any]: return request_dict +@dataclass +class SynchronizeGridRequest(AsynchronousCommunicator): + """ + A request to synchronize a grid session. + + The request is modeled from: + + The response is modeled from: + """ + + grid_session_id: str + """The ID of the grid session to synchronize.""" + + concrete_type: str = field(default=SYNCHRONIZE_GRID_REQUEST) + """The concrete type for this request.""" + + error_messages: Optional[list[str]] = field(default=None, compare=False) + """Any error messages generated during the synchronization process.""" + + def fill_from_dict( + self, synapse_response: Dict[str, Any] + ) -> "SynchronizeGridRequest": + """ + Converts a response from the REST API into this dataclass. + + Arguments: + synapse_response: The response from the REST API. + + Returns: + The SynchronizeGridRequest object. + """ + self.error_messages = synapse_response.get("errorMessages", None) + return self + + def to_synapse_request(self) -> Dict[str, Any]: + """ + Converts this dataclass to a dictionary suitable for a Synapse REST API request. + + Returns: + A dictionary representation of this object for API requests. + """ + return { + "concreteType": self.concrete_type, + "gridSessionId": self.grid_session_id, + } + + @dataclass class GridSession: """ @@ -1572,6 +1620,50 @@ def export_to_record_set( """ return self + def synchronize( + self, *, timeout: int = 120, synapse_client: Optional[Synapse] = None + ) -> "Grid": + """ + Synchronizes the grid session's schema and row data against its source entity. + + This is intended for grid sessions created from a file view via `initial_query`. + Grid sessions backed by a RecordSet should use `export_to_record_set` instead. + + Arguments: + timeout: The number of seconds to wait for the job to complete or progress + before raising a SynapseTimeoutError. Defaults to 120. + synapse_client: If not passed in and caching was not disabled by + `Synapse.allow_client_caching(False)` this will use the last created + instance from the Synapse class constructor. + + Returns: + Grid: The Grid object. + + Raises: + ValueError: If session_id is not provided. + + Example: Synchronize a grid session created from a file view +   + + ```python + from synapseclient import Synapse + from synapseclient.models import Grid + from synapseclient.models.table_components import Query + + syn = Synapse() + syn.login() + + # First create a grid session from a file view + query = Query(sql="SELECT * FROM syn1234567") + grid = Grid(initial_query=query) + grid = grid.create() + + # Synchronize the grid with the latest state of the file view + grid = grid.synchronize() + ``` + """ + return self + def delete(self, *, synapse_client: Optional[Synapse] = None) -> None: """ Delete the grid session. @@ -2447,3 +2539,69 @@ async def main(): url_is_presigned=True, synapse_client=synapse_client, ) + + @otel_trace_method( + method_to_trace_name=lambda self, **kwargs: f"Grid_Synchronize: ID: {self.session_id}" + ) + async def synchronize_async( + self, *, timeout: int = 120, synapse_client: Optional[Synapse] = None + ) -> "Grid": + """ + Synchronizes the grid session's schema and row data against its source entity. + + This is intended for grid sessions created from a file view via `initial_query`. + Grid sessions backed by a RecordSet should use `export_to_record_set` instead. + + Arguments: + timeout: The number of seconds to wait for the job to complete or progress + before raising a SynapseTimeoutError. Defaults to 120. + synapse_client: If not passed in and caching was not disabled by + `Synapse.allow_client_caching(False)` this will use the last created + instance from the Synapse class constructor. + + Returns: + Grid: The Grid object. + + Raises: + ValueError: If session_id is not provided. + + Example: Synchronize a grid session created from a file view +   + + ```python + import asyncio + from synapseclient import Synapse + from synapseclient.models import Grid + from synapseclient.models.table_components import Query + + syn = Synapse() + syn.login() + + async def main(): + # First create a grid session from a file view + query = Query(sql="SELECT * FROM syn1234567") + grid = Grid(initial_query=query) + grid = await grid.create_async() + + # Synchronize the grid with the latest state of the file view + grid = await grid.synchronize_async() + + asyncio.run(main()) + ``` + """ + if not self.session_id: + raise ValueError("session_id is required to synchronize a GridSession") + + request = SynchronizeGridRequest(grid_session_id=self.session_id) + result = await request.send_job_and_wait_async( + timeout=timeout, synapse_client=synapse_client + ) + + if result.error_messages: + client = Synapse.get_client(synapse_client=synapse_client) + client.logger.error( + f"Grid session '{self.session_id}' synchronization completed with " + f"error messages: {result.error_messages}" + ) + + return self diff --git a/synapseclient/models/mixins/asynchronous_job.py b/synapseclient/models/mixins/asynchronous_job.py index 13eb4ee63..3110f892b 100644 --- a/synapseclient/models/mixins/asynchronous_job.py +++ b/synapseclient/models/mixins/asynchronous_job.py @@ -21,6 +21,7 @@ GRID_RECORD_SET_EXPORT_REQUEST, QUERY_BUNDLE_REQUEST, QUERY_TABLE_CSV_REQUEST, + SYNCHRONIZE_GRID_REQUEST, TABLE_UPDATE_TRANSACTION_REQUEST, UPLOAD_TO_TABLE_PREVIEW_REQUEST, ) @@ -36,6 +37,7 @@ DOWNLOAD_FROM_GRID_REQUEST: "/grid/download/csv/async", DOWNLOAD_LIST_MANIFEST_REQUEST: "/download/list/manifest/async", GRID_RECORD_SET_EXPORT_REQUEST: "/grid/export/recordset/async", + SYNCHRONIZE_GRID_REQUEST: "/grid/synchronize/async", TABLE_UPDATE_TRANSACTION_REQUEST: "/entity/{entityId}/table/transaction/async", GET_VALIDATION_SCHEMA_REQUEST: "/schema/type/validation/async", CREATE_SCHEMA_REQUEST: "/schema/type/create/async", diff --git a/tests/integration/synapseclient/models/async/test_grid_async.py b/tests/integration/synapseclient/models/async/test_grid_async.py index b17831812..75b297333 100644 --- a/tests/integration/synapseclient/models/async/test_grid_async.py +++ b/tests/integration/synapseclient/models/async/test_grid_async.py @@ -9,8 +9,20 @@ import pytest from synapseclient import Synapse -from synapseclient.models import File, Grid, Project, RecordSet -from tests.integration import ASYNC_JOB_TIMEOUT_SEC +from synapseclient.core.utils import make_bogus_data_file +from synapseclient.models import ( + EntityView, + File, + Folder, + Grid, + Project, + RecordSet, + ViewTypeMask, + query_async, +) +from synapseclient.models.table_components import Query +from tests.integration import ASYNC_JOB_TIMEOUT_SEC, QUERY_TIMEOUT_SEC +from tests.integration.helpers import wait_for_condition class TestGridAsync: @@ -21,6 +33,31 @@ def init(self, syn: Synapse, schedule_for_cleanup: Callable[..., None]) -> None: self.syn = syn self.schedule_for_cleanup = schedule_for_cleanup + @pytest.fixture(scope="function") + async def entity_view( + self, + project_model: Project, + syn: Synapse, + schedule_for_cleanup: Callable[..., None], + ) -> tuple[Folder, EntityView]: + """Create a folder with an associated EntityView for file-based testing.""" + # Create a folder + folder = await Folder( + name=str(uuid.uuid4()), + parent_id=project_model.id, + ).store_async(synapse_client=syn) + schedule_for_cleanup(folder.id) + + entity_view = await EntityView( + name=str(uuid.uuid4()), + parent_id=project_model.id, + scope_ids=[folder.id], + view_type_mask=ViewTypeMask.FILE.value, + ).store_async(synapse_client=syn) + schedule_for_cleanup(entity_view.id) + + return folder, entity_view + @pytest.fixture(scope="function") async def record_set_fixture(self, project_model: Project) -> RecordSet: """Create a RecordSet fixture for Grid testing.""" @@ -184,6 +221,62 @@ async def test_delete_grid_session_validation_error_async(self) -> None: ): await grid.delete_async(synapse_client=self.syn) + async def test_synchronize_grid_async( + self, + entity_view: tuple[Folder, EntityView], + ) -> None: + folder, ev = entity_view + + # GIVEN: A Grid session created at T0 from an empty EntityView + query = Query(sql=f"SELECT * FROM {ev.id}") + grid = Grid(initial_query=query) + created_grid = await grid.create_async(synapse_client=self.syn) + + try: + # AND: A file uploaded into the scoped folder + bogus_file = make_bogus_data_file() + self.schedule_for_cleanup(bogus_file) + uploaded_file = await File( + path=bogus_file, + parent_id=folder.id, + ).store_async(synapse_client=self.syn) + self.schedule_for_cleanup(uploaded_file.id) + + # Wait for the EntityView to index the new file + async def file_indexed() -> bool: + df = await query_async( + query=f"SELECT id FROM {ev.id} WHERE id = '{uploaded_file.id}'", + include_row_id_and_row_version=False, + synapse_client=self.syn, + ) + return not df.empty + + await wait_for_condition( + condition_fn=file_indexed, + timeout_seconds=QUERY_TIMEOUT_SEC, + ) + + # WHEN: Synchronizing the same session + synced_grid = await created_grid.synchronize_async(synapse_client=self.syn) + + # THEN: The session ID is unchanged + assert synced_grid.session_id == created_grid.session_id + assert synced_grid.source_entity_id == ev.id + + # AND: The downloaded CSV reflects the newly uploaded file + dest = tempfile.mkdtemp() + self.schedule_for_cleanup(dest) + csv_path = await synced_grid.download_csv_async( + destination=dest, + timeout=ASYNC_JOB_TIMEOUT_SEC, + synapse_client=self.syn, + ) + df = pd.read_csv(csv_path) + assert uploaded_file.id in df["id"].tolist() + finally: + if created_grid.session_id: + await created_grid.delete_async(synapse_client=self.syn) + async def test_import_csv_to_grid_session_async( self, record_set_fixture: RecordSet, diff --git a/tests/unit/synapseclient/models/async/unit_test_curation_async.py b/tests/unit/synapseclient/models/async/unit_test_curation_async.py index 33dad3953..1e25c0aa7 100644 --- a/tests/unit/synapseclient/models/async/unit_test_curation_async.py +++ b/tests/unit/synapseclient/models/async/unit_test_curation_async.py @@ -21,6 +21,7 @@ GridCsvImportRequest, GridRecordSetExportRequest, RecordBasedMetadataTaskProperties, + SynchronizeGridRequest, UploadToTablePreviewRequest, _create_task_properties_from_dict, ) @@ -1517,3 +1518,90 @@ async def test_download_csv_async_empty_file_handle_id(self): "the job may have failed silently.", ): await grid.download_csv_async(synapse_client=self.syn) + + +class TestSynchronizeGridRequest: + def test_to_synapse_request(self) -> None: + # GIVEN a SynchronizeGridRequest with all fields set + sync_req = SynchronizeGridRequest( + grid_session_id=SESSION_ID, + ) + + # WHEN I convert it to a synapse request + result = sync_req.to_synapse_request() + + # THEN it should contain the correct fields + assert "concreteType" in result + assert result["gridSessionId"] == SESSION_ID + + def test_fill_from_dict(self) -> None: + # GIVEN a response with synchronize grid session data + raw_response = { + "jobId": "1234", + "concreteType": "org.sagebionetworks.repo.model.grid.SynchronizeGridResponse", + "gridSessionId": SESSION_ID, + "errorMessages": ["test_error"], + } + + # WHEN I fill a SynchronizeGridRequest from the response + sync_req = SynchronizeGridRequest(grid_session_id=SESSION_ID) + response = sync_req.fill_from_dict(raw_response) + assert "test_error" in response.error_messages + assert response.grid_session_id == SESSION_ID + + +class TestSynchronizeGrid: + @pytest.fixture(autouse=True, scope="function") + def init_syn(self, syn: Synapse) -> None: + self.syn = syn + + async def test_synchronize_grid_async_without_session_id_raises(self) -> None: + # GIVEN a Grid without a session_id + grid = Grid() + + # WHEN I call synchronize_async + # THEN it should raise ValueError + with pytest.raises(ValueError, match="session_id is required to synchronize"): + await grid.synchronize_async(synapse_client=self.syn) + + async def test_synchronize_grid_async_empty_error(self) -> None: + # GIVEN a Grid with a session_id + grid = Grid(session_id=SESSION_ID) + mock_sync_response = SynchronizeGridRequest( + grid_session_id=SESSION_ID, + error_messages=[], + ) + + # WHEN I call synchronize_async + with patch( + "synapseclient.models.curation.SynchronizeGridRequest.send_job_and_wait_async", + new_callable=AsyncMock, + return_value=mock_sync_response, + ) as mock_sync: + await grid.synchronize_async(synapse_client=self.syn) + + # THEN the API should be called with the session_id + mock_sync.assert_called_once_with(synapse_client=self.syn, timeout=120) + + async def test_synchronize_grid_async_with_errors(self) -> None: + # GIVEN a Grid with a session_id + grid = Grid(session_id=SESSION_ID) + mock_sync_response = SynchronizeGridRequest( + grid_session_id=SESSION_ID, + error_messages=["sync_error_1", "sync_error_2"], + ) + + # WHEN I call synchronize_async + with patch( + "synapseclient.models.curation.SynchronizeGridRequest.send_job_and_wait_async", + new_callable=AsyncMock, + return_value=mock_sync_response, + ): + with patch.object(self.syn, "logger") as mock_logger: + await grid.synchronize_async(synapse_client=self.syn) + + # THEN the error messages should be logged as an error + mock_logger.error.assert_called_once() + error_message = mock_logger.error.call_args[0][0] + assert "sync_error_1" in error_message + assert "sync_error_2" in error_message