Skip to content
1 change: 1 addition & 0 deletions docs/reference/experimental/async/curator.md
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,7 @@ at your own risk.
members:
- create_async
- export_to_record_set_async
- synchronize_async
- download_csv_async
- import_csv_async
---
Expand Down
1 change: 1 addition & 0 deletions docs/reference/experimental/sync/curator.md
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,7 @@ at your own risk.
members:
- create
- export_to_record_set
- synchronize
- download_csv
- import_csv
---
Expand Down
1 change: 1 addition & 0 deletions synapseclient/core/constants/concrete_types.py
Original file line number Diff line number Diff line change
Expand Up @@ -152,6 +152,7 @@
LIST_GRID_SESSIONS_RESPONSE = (
"org.sagebionetworks.repo.model.grid.ListGridSessionsResponse"
)
SYNCHRONIZE_GRID_REQUEST = "org.sagebionetworks.repo.model.grid.SynchronizeGridRequest"
GRID_CSV_IMPORT_REQUEST = "org.sagebionetworks.repo.model.grid.GridCsvImportRequest"
UPLOAD_TO_TABLE_PREVIEW_REQUEST = (
"org.sagebionetworks.repo.model.table.UploadToTablePreviewRequest"
Expand Down
158 changes: 158 additions & 0 deletions synapseclient/models/curation.py
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@
LIST_GRID_SESSIONS_REQUEST,
LIST_GRID_SESSIONS_RESPONSE,
RECORD_BASED_METADATA_TASK_PROPERTIES,
SYNCHRONIZE_GRID_REQUEST,
UPLOAD_TO_TABLE_PREVIEW_REQUEST,
)
from synapseclient.core.download.download_functions import download_from_url
Expand Down Expand Up @@ -1311,6 +1312,53 @@ def to_synapse_request(self) -> Dict[str, Any]:
return request_dict


@dataclass
class SynchronizeGridRequest(AsynchronousCommunicator):
"""
A request to synchronize a grid session.

The request is modeled from: <https://rest-docs.synapse.org/rest/org/sagebionetworks/repo/model/grid/SynchronizeGridRequest.html>

The response is modeled from: <https://rest-docs.synapse.org/rest/org/sagebionetworks/repo/model/grid/SynchronizeGridResponse.html>
"""

grid_session_id: str
"""The ID of the grid session to synchronize."""
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This seems like it should be required, why have a default value? Should we call this grid_session_id to be consistent with the endpoint?


concrete_type: str = field(default=SYNCHRONIZE_GRID_REQUEST)
"""The concrete type for this request."""

error_messages: Optional[list[str]] = field(default=None, compare=False)
"""Any error messages generated during the synchronization process."""

def fill_from_dict(
self, synapse_response: Dict[str, Any]
) -> "SynchronizeGridRequest":
"""
Converts a response from the REST API into this dataclass.

Arguments:
synapse_response: The response from the REST API.

Returns:
The SynchronizeGridRequest object.
"""
self.error_messages = synapse_response.get("errorMessages", None)
return self

def to_synapse_request(self) -> Dict[str, Any]:
"""
Converts this dataclass to a dictionary suitable for a Synapse REST API request.

Returns:
A dictionary representation of this object for API requests.
"""
return {
"concreteType": self.concrete_type,
"gridSessionId": self.grid_session_id,
}


@dataclass
class GridSession:
"""
Expand Down Expand Up @@ -1572,6 +1620,50 @@ def export_to_record_set(
"""
return self

def synchronize(
self, *, timeout: int = 120, synapse_client: Optional[Synapse] = None
) -> "Grid":
"""
Synchronizes the grid session's schema and row data against its source entity.

This is intended for grid sessions created from a file view via `initial_query`.
Grid sessions backed by a RecordSet should use `export_to_record_set` instead.

Arguments:
timeout: The number of seconds to wait for the job to complete or progress
before raising a SynapseTimeoutError. Defaults to 120.
synapse_client: If not passed in and caching was not disabled by
`Synapse.allow_client_caching(False)` this will use the last created
instance from the Synapse class constructor.

Returns:
Grid: The Grid object.

Raises:
ValueError: If session_id is not provided.

Example: Synchronize a grid session created from a file view
&nbsp;

```python
from synapseclient import Synapse
from synapseclient.models import Grid
from synapseclient.models.table_components import Query

syn = Synapse()
syn.login()

# First create a grid session from a file view
query = Query(sql="SELECT * FROM syn1234567")
grid = Grid(initial_query=query)
grid = grid.create()

# Synchronize the grid with the latest state of the file view
grid = grid.synchronize()
```
"""
return self

def delete(self, *, synapse_client: Optional[Synapse] = None) -> None:
"""
Delete the grid session.
Expand Down Expand Up @@ -2447,3 +2539,69 @@ async def main():
url_is_presigned=True,
synapse_client=synapse_client,
)

@otel_trace_method(
method_to_trace_name=lambda self, **kwargs: f"Grid_Synchronize: ID: {self.session_id}"
)
async def synchronize_async(
self, *, timeout: int = 120, synapse_client: Optional[Synapse] = None
) -> "Grid":
"""
Synchronizes the grid session's schema and row data against its source entity.

This is intended for grid sessions created from a file view via `initial_query`.
Grid sessions backed by a RecordSet should use `export_to_record_set` instead.

Arguments:
timeout: The number of seconds to wait for the job to complete or progress
before raising a SynapseTimeoutError. Defaults to 120.
synapse_client: If not passed in and caching was not disabled by
`Synapse.allow_client_caching(False)` this will use the last created
instance from the Synapse class constructor.

Returns:
Grid: The Grid object.

Raises:
ValueError: If session_id is not provided.

Example: Synchronize a grid session created from a file view
&nbsp;

```python
import asyncio
from synapseclient import Synapse
from synapseclient.models import Grid
from synapseclient.models.table_components import Query

syn = Synapse()
syn.login()

async def main():
# First create a grid session from a file view
query = Query(sql="SELECT * FROM syn1234567")
grid = Grid(initial_query=query)
grid = await grid.create_async()

# Synchronize the grid with the latest state of the file view
grid = await grid.synchronize_async()

asyncio.run(main())
```
"""
if not self.session_id:
raise ValueError("session_id is required to synchronize a GridSession")

request = SynchronizeGridRequest(grid_session_id=self.session_id)
result = await request.send_job_and_wait_async(
timeout=timeout, synapse_client=synapse_client
)

if result.error_messages:
client = Synapse.get_client(synapse_client=synapse_client)
client.logger.error(
f"Grid session '{self.session_id}' synchronization completed with "
f"error messages: {result.error_messages}"
)

return self
2 changes: 2 additions & 0 deletions synapseclient/models/mixins/asynchronous_job.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
GRID_RECORD_SET_EXPORT_REQUEST,
QUERY_BUNDLE_REQUEST,
QUERY_TABLE_CSV_REQUEST,
SYNCHRONIZE_GRID_REQUEST,
TABLE_UPDATE_TRANSACTION_REQUEST,
UPLOAD_TO_TABLE_PREVIEW_REQUEST,
)
Expand All @@ -36,6 +37,7 @@
DOWNLOAD_FROM_GRID_REQUEST: "/grid/download/csv/async",
DOWNLOAD_LIST_MANIFEST_REQUEST: "/download/list/manifest/async",
GRID_RECORD_SET_EXPORT_REQUEST: "/grid/export/recordset/async",
SYNCHRONIZE_GRID_REQUEST: "/grid/synchronize/async",
TABLE_UPDATE_TRANSACTION_REQUEST: "/entity/{entityId}/table/transaction/async",
GET_VALIDATION_SCHEMA_REQUEST: "/schema/type/validation/async",
CREATE_SCHEMA_REQUEST: "/schema/type/create/async",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,15 @@
import pytest

from synapseclient import Synapse
from synapseclient.models import File, Grid, Project, RecordSet
from synapseclient.models import (
EntityView,
Folder,
Grid,
Project,
RecordSet,
ViewTypeMask,
)
from synapseclient.models.table_components import Query
from tests.integration import ASYNC_JOB_TIMEOUT_SEC


Expand All @@ -21,6 +29,31 @@ def init(self, syn: Synapse, schedule_for_cleanup: Callable[..., None]) -> None:
self.syn = syn
self.schedule_for_cleanup = schedule_for_cleanup

@pytest.fixture(scope="class")
async def entity_view(
self,
project_model: Project,
syn: Synapse,
schedule_for_cleanup: Callable[..., None],
) -> EntityView:
"""Create a folder with an associated EntityView for file-based testing."""
# Create a folder
folder = await Folder(
name=str(uuid.uuid4()),
parent_id=project_model.id,
).store_async(synapse_client=syn)
schedule_for_cleanup(folder.id)

entity_view = await EntityView(
name=str(uuid.uuid4()),
parent_id=project_model.id,
scope_ids=[folder.id],
view_type_mask=ViewTypeMask.FILE.value,
).store_async(synapse_client=syn)
schedule_for_cleanup(entity_view.id)

return entity_view
Comment thread
linglp marked this conversation as resolved.

@pytest.fixture(scope="function")
async def record_set_fixture(self, project_model: Project) -> RecordSet:
"""Create a RecordSet fixture for Grid testing."""
Expand Down Expand Up @@ -184,6 +217,21 @@ async def test_delete_grid_session_validation_error_async(self) -> None:
):
await grid.delete_async(synapse_client=self.syn)

async def test_synchronize_grid_async(
self,
entity_view: EntityView,
) -> None:

# GIVEN: A Grid with a session_id
query = Query(sql=f"SELECT * FROM {entity_view.id}")
grid = Grid(initial_query=query)
created_grid = await grid.create_async(synapse_client=self.syn)

# WHEN: Synchronizing the grid session
grid = await created_grid.synchronize_async(synapse_client=self.syn)
assert grid.source_entity_id == entity_view.id
assert grid.session_id == created_grid.session_id

async def test_import_csv_to_grid_session_async(
self,
record_set_fixture: RecordSet,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
GridCsvImportRequest,
GridRecordSetExportRequest,
RecordBasedMetadataTaskProperties,
SynchronizeGridRequest,
UploadToTablePreviewRequest,
_create_task_properties_from_dict,
)
Expand Down Expand Up @@ -1517,3 +1518,90 @@ async def test_download_csv_async_empty_file_handle_id(self):
"the job may have failed silently.",
):
await grid.download_csv_async(synapse_client=self.syn)


class TestSynchronizeGridRequest:
def test_to_synapse_request(self) -> None:
# GIVEN a SynchronizeGridRequest with all fields set
sync_req = SynchronizeGridRequest(
grid_session_id=SESSION_ID,
)

# WHEN I convert it to a synapse request
result = sync_req.to_synapse_request()

# THEN it should contain the correct fields
assert "concreteType" in result
assert result["gridSessionId"] == SESSION_ID

def test_fill_from_dict(self) -> None:
# GIVEN a response with synchronize grid session data
raw_response = {
"jobId": "1234",
"concreteType": "org.sagebionetworks.repo.model.grid.SynchronizeGridResponse",
"gridSessionId": SESSION_ID,
"errorMessages": ["test_error"],
}

# WHEN I fill a SynchronizeGridRequest from the response
sync_req = SynchronizeGridRequest(grid_session_id=SESSION_ID)
response = sync_req.fill_from_dict(raw_response)
assert "test_error" in response.error_messages
assert response.grid_session_id == SESSION_ID


class TestSynchronizeGrid:
@pytest.fixture(autouse=True, scope="function")
def init_syn(self, syn: Synapse) -> None:
self.syn = syn

async def test_synchronize_grid_async_without_session_id_raises(self) -> None:
# GIVEN a Grid without a session_id
grid = Grid()

# WHEN I call synchronize_async
# THEN it should raise ValueError
with pytest.raises(ValueError, match="session_id is required to synchronize"):
await grid.synchronize_async(synapse_client=self.syn)

async def test_synchronize_grid_async_empty_error(self) -> None:
# GIVEN a Grid with a session_id
grid = Grid(session_id=SESSION_ID)
mock_sync_response = SynchronizeGridRequest(
grid_session_id=SESSION_ID,
error_messages=[],
)

# WHEN I call synchronize_async
with patch(
"synapseclient.models.curation.SynchronizeGridRequest.send_job_and_wait_async",
new_callable=AsyncMock,
return_value=mock_sync_response,
) as mock_sync:
await grid.synchronize_async(synapse_client=self.syn)

# THEN the API should be called with the session_id
mock_sync.assert_called_once_with(synapse_client=self.syn, timeout=120)

async def test_synchronize_grid_async_with_errors(self) -> None:
# GIVEN a Grid with a session_id
grid = Grid(session_id=SESSION_ID)
mock_sync_response = SynchronizeGridRequest(
grid_session_id=SESSION_ID,
error_messages=["sync_error_1", "sync_error_2"],
)

# WHEN I call synchronize_async
with patch(
"synapseclient.models.curation.SynchronizeGridRequest.send_job_and_wait_async",
new_callable=AsyncMock,
return_value=mock_sync_response,
):
with patch.object(self.syn, "logger") as mock_logger:
await grid.synchronize_async(synapse_client=self.syn)

# THEN the error messages should be logged as an error
mock_logger.error.assert_called_once()
error_message = mock_logger.error.call_args[0][0]
assert "sync_error_1" in error_message
assert "sync_error_2" in error_message
Loading