From 076cae3d7ad10d37784edec26106b9ee6d366ad9 Mon Sep 17 00:00:00 2001 From: ferponse Date: Wed, 18 Mar 2026 15:08:35 +0100 Subject: [PATCH 1/5] feat(sessions): add pagination support to list_sessions Add page_size and page_token parameters to list_sessions() across all session service implementations to avoid unbounded queries on large datasets. Closes #4871 --- .../adk/sessions/base_session_service.py | 41 ++++- .../adk/sessions/database_session_service.py | 34 +++- .../adk/sessions/in_memory_session_service.py | 71 ++++++-- .../adk/sessions/sqlite_session_service.py | 39 ++++- .../adk/sessions/vertex_ai_session_service.py | 7 +- .../sessions/test_session_service.py | 161 ++++++++++++++++++ 6 files changed, 326 insertions(+), 27 deletions(-) diff --git a/src/google/adk/sessions/base_session_service.py b/src/google/adk/sessions/base_session_service.py index eb22a83bb9..2e45529557 100644 --- a/src/google/adk/sessions/base_session_service.py +++ b/src/google/adk/sessions/base_session_service.py @@ -15,6 +15,7 @@ from __future__ import annotations import abc +import base64 from typing import Any from typing import Optional @@ -25,6 +26,29 @@ from .session import Session from .state import State +_DEFAULT_PAGE_SIZE = 20 +_MAX_PAGE_SIZE = 100 + + +def _resolve_page_size(page_size: Optional[int]) -> int: + """Clamp *page_size* to [1, _MAX_PAGE_SIZE], defaulting to _DEFAULT_PAGE_SIZE.""" + if page_size is None: + return _DEFAULT_PAGE_SIZE + return max(1, min(page_size, _MAX_PAGE_SIZE)) + + +def _encode_page_token(offset: int) -> str: + return base64.b64encode(str(offset).encode()).decode() + + +def _decode_page_token(token: Optional[str]) -> int: + if not token: + return 0 + try: + return max(0, int(base64.b64decode(token).decode())) + except (ValueError, Exception): + return 0 + class GetSessionConfig(BaseModel): """The configuration of getting a session.""" @@ -40,6 +64,7 @@ class ListSessionsResponse(BaseModel): """ sessions: list[Session] = Field(default_factory=list) + next_page_token: Optional[str] = None class BaseSessionService(abc.ABC): @@ -83,17 +108,27 @@ async def get_session( @abc.abstractmethod async def list_sessions( - self, *, app_name: str, user_id: Optional[str] = None + self, + *, + app_name: str, + user_id: Optional[str] = None, + page_size: Optional[int] = None, + page_token: Optional[str] = None, ) -> ListSessionsResponse: - """Lists all the sessions for a user. + """Lists sessions, optionally filtered by user, with pagination. Args: app_name: The name of the app. user_id: The ID of the user. If not provided, lists all sessions for all users. + page_size: Maximum number of sessions to return per page. Defaults to 20, + maximum 100. + page_token: Token returned from a previous ``list_sessions`` call to + fetch the next page. Returns: - A ListSessionsResponse containing the sessions. + A ListSessionsResponse containing the sessions and an optional + ``next_page_token`` for fetching subsequent pages. """ @abc.abstractmethod diff --git a/src/google/adk/sessions/database_session_service.py b/src/google/adk/sessions/database_session_service.py index 1aeb464b7f..fa2a326dac 100644 --- a/src/google/adk/sessions/database_session_service.py +++ b/src/google/adk/sessions/database_session_service.py @@ -43,6 +43,9 @@ from .base_session_service import BaseSessionService from .base_session_service import GetSessionConfig from .base_session_service import ListSessionsResponse +from .base_session_service import _decode_page_token +from .base_session_service import _encode_page_token +from .base_session_service import _resolve_page_size from .migration import _schema_check_utils from .schemas.v0 import Base as BaseV0 from .schemas.v0 import StorageAppState as StorageAppStateV0 @@ -506,10 +509,19 @@ async def get_session( @override async def list_sessions( - self, *, app_name: str, user_id: Optional[str] = None + self, + *, + app_name: str, + user_id: Optional[str] = None, + page_size: Optional[int] = None, + page_token: Optional[str] = None, ) -> ListSessionsResponse: await self._prepare_tables() schema = self._get_schema_classes() + + effective_page_size = _resolve_page_size(page_size) + offset = _decode_page_token(page_token) + async with self._rollback_on_exception_session( read_only=True ) as sql_session: @@ -519,8 +531,16 @@ async def list_sessions( if user_id is not None: stmt = stmt.filter(schema.StorageSession.user_id == user_id) + stmt = stmt.order_by(schema.StorageSession.update_time.desc()) + # Fetch one extra row to determine if there is a next page. + stmt = stmt.offset(offset).limit(effective_page_size + 1) + result = await sql_session.execute(stmt) - results = result.scalars().all() + results = list(result.scalars().all()) + + has_next_page = len(results) > effective_page_size + if has_next_page: + results = results[:effective_page_size] # Fetch app state from storage storage_app_state = await sql_session.get( @@ -554,7 +574,15 @@ async def list_sessions( sessions.append( storage_session.to_session(state=merged_state, is_sqlite=is_sqlite) ) - return ListSessionsResponse(sessions=sessions) + + next_page_token = ( + _encode_page_token(offset + effective_page_size) + if has_next_page + else None + ) + return ListSessionsResponse( + sessions=sessions, next_page_token=next_page_token + ) @override async def delete_session( diff --git a/src/google/adk/sessions/in_memory_session_service.py b/src/google/adk/sessions/in_memory_session_service.py index e0f9b49ff3..e629d0898c 100644 --- a/src/google/adk/sessions/in_memory_session_service.py +++ b/src/google/adk/sessions/in_memory_session_service.py @@ -28,6 +28,9 @@ from .base_session_service import BaseSessionService from .base_session_service import GetSessionConfig from .base_session_service import ListSessionsResponse +from .base_session_service import _decode_page_token +from .base_session_service import _encode_page_token +from .base_session_service import _resolve_page_size from .session import Session from .state import State @@ -220,18 +223,43 @@ def _merge_state( @override async def list_sessions( - self, *, app_name: str, user_id: Optional[str] = None + self, + *, + app_name: str, + user_id: Optional[str] = None, + page_size: Optional[int] = None, + page_token: Optional[str] = None, ) -> ListSessionsResponse: - return self._list_sessions_impl(app_name=app_name, user_id=user_id) + return self._list_sessions_impl( + app_name=app_name, + user_id=user_id, + page_size=page_size, + page_token=page_token, + ) def list_sessions_sync( - self, *, app_name: str, user_id: Optional[str] = None + self, + *, + app_name: str, + user_id: Optional[str] = None, + page_size: Optional[int] = None, + page_token: Optional[str] = None, ) -> ListSessionsResponse: logger.warning('Deprecated. Please migrate to the async method.') - return self._list_sessions_impl(app_name=app_name, user_id=user_id) + return self._list_sessions_impl( + app_name=app_name, + user_id=user_id, + page_size=page_size, + page_token=page_token, + ) def _list_sessions_impl( - self, *, app_name: str, user_id: Optional[str] = None + self, + *, + app_name: str, + user_id: Optional[str] = None, + page_size: Optional[int] = None, + page_token: Optional[str] = None, ) -> ListSessionsResponse: empty_response = ListSessionsResponse() if app_name not in self.sessions: @@ -239,23 +267,40 @@ def _list_sessions_impl( if user_id is not None and user_id not in self.sessions[app_name]: return empty_response - sessions_without_events = [] + all_sessions = [] if user_id is None: - for user_id in self.sessions[app_name]: - for session_id in self.sessions[app_name][user_id]: - session = self.sessions[app_name][user_id][session_id] + for uid in self.sessions[app_name]: + for session_id in self.sessions[app_name][uid]: + session = self.sessions[app_name][uid][session_id] copied_session = copy.deepcopy(session) copied_session.events = [] - copied_session = self._merge_state(app_name, user_id, copied_session) - sessions_without_events.append(copied_session) + copied_session = self._merge_state(app_name, uid, copied_session) + all_sessions.append(copied_session) else: for session in self.sessions[app_name][user_id].values(): copied_session = copy.deepcopy(session) copied_session.events = [] copied_session = self._merge_state(app_name, user_id, copied_session) - sessions_without_events.append(copied_session) - return ListSessionsResponse(sessions=sessions_without_events) + all_sessions.append(copied_session) + + # Sort by last_update_time descending (most recently updated first) + all_sessions.sort( + key=lambda s: s.last_update_time if s.last_update_time else 0, + reverse=True, + ) + + effective_page_size = _resolve_page_size(page_size) + offset = _decode_page_token(page_token) + + page = all_sessions[offset : offset + effective_page_size] + has_next_page = (offset + effective_page_size) < len(all_sessions) + next_page_token = ( + _encode_page_token(offset + effective_page_size) + if has_next_page + else None + ) + return ListSessionsResponse(sessions=page, next_page_token=next_page_token) @override async def delete_session( diff --git a/src/google/adk/sessions/sqlite_session_service.py b/src/google/adk/sessions/sqlite_session_service.py index 3ad84e9d1a..6adf62da34 100644 --- a/src/google/adk/sessions/sqlite_session_service.py +++ b/src/google/adk/sessions/sqlite_session_service.py @@ -35,6 +35,9 @@ from .base_session_service import BaseSessionService from .base_session_service import GetSessionConfig from .base_session_service import ListSessionsResponse +from .base_session_service import _decode_page_token +from .base_session_service import _encode_page_token +from .base_session_service import _resolve_page_size from .session import Session from .state import State @@ -292,24 +295,38 @@ async def get_session( @override async def list_sessions( - self, *, app_name: str, user_id: Optional[str] = None + self, + *, + app_name: str, + user_id: Optional[str] = None, + page_size: Optional[int] = None, + page_token: Optional[str] = None, ) -> ListSessionsResponse: + effective_page_size = _resolve_page_size(page_size) + offset = _decode_page_token(page_token) + sessions_list = [] async with self._get_db_connection() as db: - # Fetch sessions + # Fetch sessions with ORDER BY / LIMIT / OFFSET if user_id: session_rows = await db.execute_fetchall( "SELECT id, user_id, state, update_time FROM sessions WHERE" - " app_name=? AND user_id=?", - (app_name, user_id), + " app_name=? AND user_id=?" + " ORDER BY update_time DESC LIMIT ? OFFSET ?", + (app_name, user_id, effective_page_size + 1, offset), ) else: session_rows = await db.execute_fetchall( "SELECT id, user_id, state, update_time FROM sessions WHERE" - " app_name=?", - (app_name,), + " app_name=?" + " ORDER BY update_time DESC LIMIT ? OFFSET ?", + (app_name, effective_page_size + 1, offset), ) + has_next_page = len(session_rows) > effective_page_size + if has_next_page: + session_rows = session_rows[:effective_page_size] + # Fetch app state app_state = await self._get_app_state(db, app_name) @@ -343,7 +360,15 @@ async def list_sessions( last_update_time=row["update_time"], ) ) - return ListSessionsResponse(sessions=sessions_list) + + next_page_token = ( + _encode_page_token(offset + effective_page_size) + if has_next_page + else None + ) + return ListSessionsResponse( + sessions=sessions_list, next_page_token=next_page_token + ) @override async def delete_session( diff --git a/src/google/adk/sessions/vertex_ai_session_service.py b/src/google/adk/sessions/vertex_ai_session_service.py index 9e5c9bb2ec..0a83dc5057 100644 --- a/src/google/adk/sessions/vertex_ai_session_service.py +++ b/src/google/adk/sessions/vertex_ai_session_service.py @@ -216,7 +216,12 @@ async def get_session( @override async def list_sessions( - self, *, app_name: str, user_id: Optional[str] = None + self, + *, + app_name: str, + user_id: Optional[str] = None, + page_size: Optional[int] = None, + page_token: Optional[str] = None, ) -> ListSessionsResponse: reasoning_engine_id = self._get_reasoning_engine_id(app_name) diff --git a/tests/unittests/sessions/test_session_service.py b/tests/unittests/sessions/test_session_service.py index 8a56600edc..f65be4ea29 100644 --- a/tests/unittests/sessions/test_session_service.py +++ b/tests/unittests/sessions/test_session_service.py @@ -1481,3 +1481,164 @@ async def tracking_fn(**kwargs): finally: database_session_service._select_required_state = original_fn await service.close() + + +# --------------------------------------------------------------------------- +# Pagination tests +# --------------------------------------------------------------------------- + + +@pytest.mark.asyncio +async def test_list_sessions_default_pagination(session_service): + """Without explicit page_size, the first 20 sessions are returned.""" + app_name = 'pagination_app' + user_id = 'user' + num_sessions = 25 + + for i in range(num_sessions): + await session_service.create_session( + app_name=app_name, user_id=user_id, session_id=f's{i:03d}' + ) + + response = await session_service.list_sessions( + app_name=app_name, user_id=user_id + ) + assert len(response.sessions) == 20 + assert response.next_page_token is not None + + +@pytest.mark.asyncio +async def test_list_sessions_custom_page_size(session_service): + """Explicit page_size is respected.""" + app_name = 'pagination_app2' + user_id = 'user' + + for i in range(10): + await session_service.create_session( + app_name=app_name, user_id=user_id, session_id=f's{i}' + ) + + response = await session_service.list_sessions( + app_name=app_name, user_id=user_id, page_size=3 + ) + assert len(response.sessions) == 3 + assert response.next_page_token is not None + + +@pytest.mark.asyncio +async def test_list_sessions_page_size_clamped_to_max(session_service): + """page_size > 100 is clamped to 100.""" + app_name = 'pagination_clamp' + user_id = 'user' + + for i in range(5): + await session_service.create_session( + app_name=app_name, user_id=user_id, session_id=f's{i}' + ) + + response = await session_service.list_sessions( + app_name=app_name, user_id=user_id, page_size=999 + ) + # Only 5 sessions exist, so all are returned and no next page. + assert len(response.sessions) == 5 + assert response.next_page_token is None + + +@pytest.mark.asyncio +async def test_list_sessions_iterate_all_pages(session_service): + """Iterating with page_token collects every session exactly once.""" + app_name = 'pagination_iter' + user_id = 'user' + total = 7 + page_size = 3 + + for i in range(total): + await session_service.create_session( + app_name=app_name, user_id=user_id, session_id=f's{i}' + ) + + collected_ids = [] + page_token = None + while True: + response = await session_service.list_sessions( + app_name=app_name, + user_id=user_id, + page_size=page_size, + page_token=page_token, + ) + collected_ids.extend(s.id for s in response.sessions) + if response.next_page_token is None: + break + page_token = response.next_page_token + + assert sorted(collected_ids) == sorted([f's{i}' for i in range(total)]) + + +@pytest.mark.asyncio +async def test_list_sessions_no_next_token_when_exact_fit(session_service): + """When total == page_size, next_page_token should be None.""" + app_name = 'pagination_exact' + user_id = 'user' + + for i in range(5): + await session_service.create_session( + app_name=app_name, user_id=user_id, session_id=f's{i}' + ) + + response = await session_service.list_sessions( + app_name=app_name, user_id=user_id, page_size=5 + ) + assert len(response.sessions) == 5 + assert response.next_page_token is None + + +@pytest.mark.asyncio +async def test_list_sessions_empty_result(session_service): + """Listing sessions for a non-existent app returns empty with no token.""" + response = await session_service.list_sessions( + app_name='nonexistent_app', user_id='nobody' + ) + assert len(response.sessions) == 0 + assert response.next_page_token is None + + +@pytest.mark.asyncio +async def test_list_sessions_backward_compatible_no_args(session_service): + """Calling list_sessions without pagination args still works (backward compat).""" + app_name = 'compat_app' + user_id = 'user' + + for i in range(3): + await session_service.create_session( + app_name=app_name, user_id=user_id, session_id=f's{i}' + ) + + response = await session_service.list_sessions( + app_name=app_name, user_id=user_id + ) + assert len(response.sessions) == 3 + assert response.next_page_token is None + + +@pytest.mark.asyncio +async def test_list_sessions_ordered_by_update_time_desc(session_service): + """Sessions are returned most-recently-updated first.""" + app_name = 'order_app' + user_id = 'user' + + s0 = await session_service.create_session( + app_name=app_name, user_id=user_id, session_id='s0' + ) + s1 = await session_service.create_session( + app_name=app_name, user_id=user_id, session_id='s1' + ) + s2 = await session_service.create_session( + app_name=app_name, user_id=user_id, session_id='s2' + ) + + response = await session_service.list_sessions( + app_name=app_name, user_id=user_id, page_size=100 + ) + ids = [s.id for s in response.sessions] + # Most recently created (and thus updated) should come first. + assert ids == ['s2', 's1', 's0'] From 4aa14ad336d5ee0f81e9de3df9b8473d56e3bd55 Mon Sep 17 00:00:00 2001 From: ferponse Date: Wed, 18 Mar 2026 15:12:57 +0100 Subject: [PATCH 2/5] fix(sessions): narrow exception handling in _decode_page_token Catch (ValueError, TypeError) instead of (ValueError, Exception) to avoid masking unexpected programming errors. --- src/google/adk/sessions/base_session_service.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/google/adk/sessions/base_session_service.py b/src/google/adk/sessions/base_session_service.py index 2e45529557..20df92ec0a 100644 --- a/src/google/adk/sessions/base_session_service.py +++ b/src/google/adk/sessions/base_session_service.py @@ -46,7 +46,7 @@ def _decode_page_token(token: Optional[str]) -> int: return 0 try: return max(0, int(base64.b64decode(token).decode())) - except (ValueError, Exception): + except (ValueError, TypeError): return 0 From 7b8ce90d35230ddb974a8cc589b54bcb91836bfd Mon Sep 17 00:00:00 2001 From: ferponse Date: Fri, 20 Mar 2026 10:08:02 +0100 Subject: [PATCH 3/5] refactor(sessions): return all sessions by default, paginate only on request When page_size is not provided, list_sessions() now returns the complete list (preserving backward compatibility). Pagination only activates when the caller explicitly passes a page_size. --- .../adk/sessions/base_session_service.py | 11 ++--- .../adk/sessions/database_session_service.py | 9 ++-- .../adk/sessions/in_memory_session_service.py | 3 ++ .../adk/sessions/sqlite_session_service.py | 49 +++++++++++++------ .../sessions/test_session_service.py | 8 +-- 5 files changed, 52 insertions(+), 28 deletions(-) diff --git a/src/google/adk/sessions/base_session_service.py b/src/google/adk/sessions/base_session_service.py index 20df92ec0a..98bd2e9d42 100644 --- a/src/google/adk/sessions/base_session_service.py +++ b/src/google/adk/sessions/base_session_service.py @@ -26,14 +26,13 @@ from .session import Session from .state import State -_DEFAULT_PAGE_SIZE = 20 _MAX_PAGE_SIZE = 100 -def _resolve_page_size(page_size: Optional[int]) -> int: - """Clamp *page_size* to [1, _MAX_PAGE_SIZE], defaulting to _DEFAULT_PAGE_SIZE.""" +def _resolve_page_size(page_size: Optional[int]) -> Optional[int]: + """Clamp *page_size* to [1, _MAX_PAGE_SIZE], or None for unlimited.""" if page_size is None: - return _DEFAULT_PAGE_SIZE + return None return max(1, min(page_size, _MAX_PAGE_SIZE)) @@ -121,8 +120,8 @@ async def list_sessions( app_name: The name of the app. user_id: The ID of the user. If not provided, lists all sessions for all users. - page_size: Maximum number of sessions to return per page. Defaults to 20, - maximum 100. + page_size: Maximum number of sessions to return per page. If not + provided, all sessions are returned (no pagination). Maximum 100. page_token: Token returned from a previous ``list_sessions`` call to fetch the next page. diff --git a/src/google/adk/sessions/database_session_service.py b/src/google/adk/sessions/database_session_service.py index fa2a326dac..ca357e35e0 100644 --- a/src/google/adk/sessions/database_session_service.py +++ b/src/google/adk/sessions/database_session_service.py @@ -532,13 +532,16 @@ async def list_sessions( stmt = stmt.filter(schema.StorageSession.user_id == user_id) stmt = stmt.order_by(schema.StorageSession.update_time.desc()) - # Fetch one extra row to determine if there is a next page. - stmt = stmt.offset(offset).limit(effective_page_size + 1) + if effective_page_size is not None: + # Fetch one extra row to determine if there is a next page. + stmt = stmt.offset(offset).limit(effective_page_size + 1) result = await sql_session.execute(stmt) results = list(result.scalars().all()) - has_next_page = len(results) > effective_page_size + has_next_page = ( + effective_page_size is not None and len(results) > effective_page_size + ) if has_next_page: results = results[:effective_page_size] diff --git a/src/google/adk/sessions/in_memory_session_service.py b/src/google/adk/sessions/in_memory_session_service.py index e629d0898c..fa47e0e616 100644 --- a/src/google/adk/sessions/in_memory_session_service.py +++ b/src/google/adk/sessions/in_memory_session_service.py @@ -293,6 +293,9 @@ def _list_sessions_impl( effective_page_size = _resolve_page_size(page_size) offset = _decode_page_token(page_token) + if effective_page_size is None: + return ListSessionsResponse(sessions=all_sessions) + page = all_sessions[offset : offset + effective_page_size] has_next_page = (offset + effective_page_size) < len(all_sessions) next_page_token = ( diff --git a/src/google/adk/sessions/sqlite_session_service.py b/src/google/adk/sessions/sqlite_session_service.py index 6adf62da34..f56b51a8d3 100644 --- a/src/google/adk/sessions/sqlite_session_service.py +++ b/src/google/adk/sessions/sqlite_session_service.py @@ -307,23 +307,42 @@ async def list_sessions( sessions_list = [] async with self._get_db_connection() as db: - # Fetch sessions with ORDER BY / LIMIT / OFFSET - if user_id: - session_rows = await db.execute_fetchall( - "SELECT id, user_id, state, update_time FROM sessions WHERE" - " app_name=? AND user_id=?" - " ORDER BY update_time DESC LIMIT ? OFFSET ?", - (app_name, user_id, effective_page_size + 1, offset), - ) + # Fetch sessions with ORDER BY and optional LIMIT / OFFSET + if effective_page_size is not None: + if user_id: + session_rows = await db.execute_fetchall( + "SELECT id, user_id, state, update_time FROM sessions WHERE" + " app_name=? AND user_id=?" + " ORDER BY update_time DESC LIMIT ? OFFSET ?", + (app_name, user_id, effective_page_size + 1, offset), + ) + else: + session_rows = await db.execute_fetchall( + "SELECT id, user_id, state, update_time FROM sessions WHERE" + " app_name=?" + " ORDER BY update_time DESC LIMIT ? OFFSET ?", + (app_name, effective_page_size + 1, offset), + ) else: - session_rows = await db.execute_fetchall( - "SELECT id, user_id, state, update_time FROM sessions WHERE" - " app_name=?" - " ORDER BY update_time DESC LIMIT ? OFFSET ?", - (app_name, effective_page_size + 1, offset), - ) + if user_id: + session_rows = await db.execute_fetchall( + "SELECT id, user_id, state, update_time FROM sessions WHERE" + " app_name=? AND user_id=?" + " ORDER BY update_time DESC", + (app_name, user_id), + ) + else: + session_rows = await db.execute_fetchall( + "SELECT id, user_id, state, update_time FROM sessions WHERE" + " app_name=?" + " ORDER BY update_time DESC", + (app_name,), + ) - has_next_page = len(session_rows) > effective_page_size + has_next_page = ( + effective_page_size is not None + and len(session_rows) > effective_page_size + ) if has_next_page: session_rows = session_rows[:effective_page_size] diff --git a/tests/unittests/sessions/test_session_service.py b/tests/unittests/sessions/test_session_service.py index f65be4ea29..c308f05778 100644 --- a/tests/unittests/sessions/test_session_service.py +++ b/tests/unittests/sessions/test_session_service.py @@ -1489,8 +1489,8 @@ async def tracking_fn(**kwargs): @pytest.mark.asyncio -async def test_list_sessions_default_pagination(session_service): - """Without explicit page_size, the first 20 sessions are returned.""" +async def test_list_sessions_default_returns_all(session_service): + """Without explicit page_size, all sessions are returned (no pagination).""" app_name = 'pagination_app' user_id = 'user' num_sessions = 25 @@ -1503,8 +1503,8 @@ async def test_list_sessions_default_pagination(session_service): response = await session_service.list_sessions( app_name=app_name, user_id=user_id ) - assert len(response.sessions) == 20 - assert response.next_page_token is not None + assert len(response.sessions) == num_sessions + assert response.next_page_token is None @pytest.mark.asyncio From b14687ce7901a7d80a461cd0b822fa405a9dbe20 Mon Sep 17 00:00:00 2001 From: ferponse Date: Fri, 20 Mar 2026 10:21:54 +0100 Subject: [PATCH 4/5] refactor(sessions): encapsulate pagination in SessionPagination class Replace loose page_size/page_token/page_offset parameters with a single optional SessionPagination object. When omitted, list_sessions() returns all sessions (no pagination). When provided, it controls page_size, page_token, and page_offset in a clean, encapsulated way. --- .../adk/sessions/base_session_service.py | 43 +++++++--- .../adk/sessions/database_session_service.py | 22 +++--- .../adk/sessions/in_memory_session_service.py | 34 +++----- .../adk/sessions/sqlite_session_service.py | 25 +++--- .../adk/sessions/vertex_ai_session_service.py | 4 +- .../sessions/test_session_service.py | 79 +++++++++++++++++-- 6 files changed, 138 insertions(+), 69 deletions(-) diff --git a/src/google/adk/sessions/base_session_service.py b/src/google/adk/sessions/base_session_service.py index 98bd2e9d42..9dfc987823 100644 --- a/src/google/adk/sessions/base_session_service.py +++ b/src/google/adk/sessions/base_session_service.py @@ -29,13 +29,6 @@ _MAX_PAGE_SIZE = 100 -def _resolve_page_size(page_size: Optional[int]) -> Optional[int]: - """Clamp *page_size* to [1, _MAX_PAGE_SIZE], or None for unlimited.""" - if page_size is None: - return None - return max(1, min(page_size, _MAX_PAGE_SIZE)) - - def _encode_page_token(offset: int) -> str: return base64.b64encode(str(offset).encode()).decode() @@ -49,6 +42,33 @@ def _decode_page_token(token: Optional[str]) -> int: return 0 +class SessionPagination(BaseModel): + """Pagination configuration for ``list_sessions``. + + When passed to ``list_sessions``, only ``page_size`` results are returned + per call. When omitted, all sessions are returned at once (no pagination). + """ + + page_size: int = 20 + """Maximum number of sessions per page (1–100, clamped automatically).""" + + page_token: Optional[str] = None + """Opaque token returned by a previous call to fetch the next page.""" + + page_offset: Optional[int] = None + """Optional 0-based starting offset. Takes precedence over ``page_token``.""" + + @property + def effective_page_size(self) -> int: + return max(1, min(self.page_size, _MAX_PAGE_SIZE)) + + @property + def offset(self) -> int: + if self.page_offset is not None: + return max(0, self.page_offset) + return _decode_page_token(self.page_token) + + class GetSessionConfig(BaseModel): """The configuration of getting a session.""" @@ -111,8 +131,7 @@ async def list_sessions( *, app_name: str, user_id: Optional[str] = None, - page_size: Optional[int] = None, - page_token: Optional[str] = None, + pagination: Optional[SessionPagination] = None, ) -> ListSessionsResponse: """Lists sessions, optionally filtered by user, with pagination. @@ -120,10 +139,8 @@ async def list_sessions( app_name: The name of the app. user_id: The ID of the user. If not provided, lists all sessions for all users. - page_size: Maximum number of sessions to return per page. If not - provided, all sessions are returned (no pagination). Maximum 100. - page_token: Token returned from a previous ``list_sessions`` call to - fetch the next page. + pagination: Optional pagination configuration. If not provided, all + sessions are returned at once. Returns: A ListSessionsResponse containing the sessions and an optional diff --git a/src/google/adk/sessions/database_session_service.py b/src/google/adk/sessions/database_session_service.py index ca357e35e0..8f40d503ef 100644 --- a/src/google/adk/sessions/database_session_service.py +++ b/src/google/adk/sessions/database_session_service.py @@ -43,9 +43,8 @@ from .base_session_service import BaseSessionService from .base_session_service import GetSessionConfig from .base_session_service import ListSessionsResponse -from .base_session_service import _decode_page_token +from .base_session_service import SessionPagination from .base_session_service import _encode_page_token -from .base_session_service import _resolve_page_size from .migration import _schema_check_utils from .schemas.v0 import Base as BaseV0 from .schemas.v0 import StorageAppState as StorageAppStateV0 @@ -513,15 +512,11 @@ async def list_sessions( *, app_name: str, user_id: Optional[str] = None, - page_size: Optional[int] = None, - page_token: Optional[str] = None, + pagination: Optional[SessionPagination] = None, ) -> ListSessionsResponse: await self._prepare_tables() schema = self._get_schema_classes() - effective_page_size = _resolve_page_size(page_size) - offset = _decode_page_token(page_token) - async with self._rollback_on_exception_session( read_only=True ) as sql_session: @@ -532,18 +527,21 @@ async def list_sessions( stmt = stmt.filter(schema.StorageSession.user_id == user_id) stmt = stmt.order_by(schema.StorageSession.update_time.desc()) - if effective_page_size is not None: + if pagination is not None: + page_size = pagination.effective_page_size + offset = pagination.offset # Fetch one extra row to determine if there is a next page. - stmt = stmt.offset(offset).limit(effective_page_size + 1) + stmt = stmt.offset(offset).limit(page_size + 1) result = await sql_session.execute(stmt) results = list(result.scalars().all()) has_next_page = ( - effective_page_size is not None and len(results) > effective_page_size + pagination is not None + and len(results) > pagination.effective_page_size ) if has_next_page: - results = results[:effective_page_size] + results = results[: pagination.effective_page_size] # Fetch app state from storage storage_app_state = await sql_session.get( @@ -579,7 +577,7 @@ async def list_sessions( ) next_page_token = ( - _encode_page_token(offset + effective_page_size) + _encode_page_token(pagination.offset + pagination.effective_page_size) if has_next_page else None ) diff --git a/src/google/adk/sessions/in_memory_session_service.py b/src/google/adk/sessions/in_memory_session_service.py index fa47e0e616..93e37bad84 100644 --- a/src/google/adk/sessions/in_memory_session_service.py +++ b/src/google/adk/sessions/in_memory_session_service.py @@ -28,9 +28,8 @@ from .base_session_service import BaseSessionService from .base_session_service import GetSessionConfig from .base_session_service import ListSessionsResponse -from .base_session_service import _decode_page_token +from .base_session_service import SessionPagination from .base_session_service import _encode_page_token -from .base_session_service import _resolve_page_size from .session import Session from .state import State @@ -227,14 +226,12 @@ async def list_sessions( *, app_name: str, user_id: Optional[str] = None, - page_size: Optional[int] = None, - page_token: Optional[str] = None, + pagination: Optional[SessionPagination] = None, ) -> ListSessionsResponse: return self._list_sessions_impl( app_name=app_name, user_id=user_id, - page_size=page_size, - page_token=page_token, + pagination=pagination, ) def list_sessions_sync( @@ -242,15 +239,13 @@ def list_sessions_sync( *, app_name: str, user_id: Optional[str] = None, - page_size: Optional[int] = None, - page_token: Optional[str] = None, + pagination: Optional[SessionPagination] = None, ) -> ListSessionsResponse: logger.warning('Deprecated. Please migrate to the async method.') return self._list_sessions_impl( app_name=app_name, user_id=user_id, - page_size=page_size, - page_token=page_token, + pagination=pagination, ) def _list_sessions_impl( @@ -258,8 +253,7 @@ def _list_sessions_impl( *, app_name: str, user_id: Optional[str] = None, - page_size: Optional[int] = None, - page_token: Optional[str] = None, + pagination: Optional[SessionPagination] = None, ) -> ListSessionsResponse: empty_response = ListSessionsResponse() if app_name not in self.sessions: @@ -290,18 +284,16 @@ def _list_sessions_impl( reverse=True, ) - effective_page_size = _resolve_page_size(page_size) - offset = _decode_page_token(page_token) - - if effective_page_size is None: + if pagination is None: return ListSessionsResponse(sessions=all_sessions) - page = all_sessions[offset : offset + effective_page_size] - has_next_page = (offset + effective_page_size) < len(all_sessions) + page_size = pagination.effective_page_size + offset = pagination.offset + + page = all_sessions[offset : offset + page_size] + has_next_page = (offset + page_size) < len(all_sessions) next_page_token = ( - _encode_page_token(offset + effective_page_size) - if has_next_page - else None + _encode_page_token(offset + page_size) if has_next_page else None ) return ListSessionsResponse(sessions=page, next_page_token=next_page_token) diff --git a/src/google/adk/sessions/sqlite_session_service.py b/src/google/adk/sessions/sqlite_session_service.py index f56b51a8d3..a0dc8b030b 100644 --- a/src/google/adk/sessions/sqlite_session_service.py +++ b/src/google/adk/sessions/sqlite_session_service.py @@ -35,9 +35,8 @@ from .base_session_service import BaseSessionService from .base_session_service import GetSessionConfig from .base_session_service import ListSessionsResponse -from .base_session_service import _decode_page_token +from .base_session_service import SessionPagination from .base_session_service import _encode_page_token -from .base_session_service import _resolve_page_size from .session import Session from .state import State @@ -299,29 +298,27 @@ async def list_sessions( *, app_name: str, user_id: Optional[str] = None, - page_size: Optional[int] = None, - page_token: Optional[str] = None, + pagination: Optional[SessionPagination] = None, ) -> ListSessionsResponse: - effective_page_size = _resolve_page_size(page_size) - offset = _decode_page_token(page_token) - sessions_list = [] async with self._get_db_connection() as db: # Fetch sessions with ORDER BY and optional LIMIT / OFFSET - if effective_page_size is not None: + if pagination is not None: + page_size = pagination.effective_page_size + offset = pagination.offset if user_id: session_rows = await db.execute_fetchall( "SELECT id, user_id, state, update_time FROM sessions WHERE" " app_name=? AND user_id=?" " ORDER BY update_time DESC LIMIT ? OFFSET ?", - (app_name, user_id, effective_page_size + 1, offset), + (app_name, user_id, page_size + 1, offset), ) else: session_rows = await db.execute_fetchall( "SELECT id, user_id, state, update_time FROM sessions WHERE" " app_name=?" " ORDER BY update_time DESC LIMIT ? OFFSET ?", - (app_name, effective_page_size + 1, offset), + (app_name, page_size + 1, offset), ) else: if user_id: @@ -340,11 +337,11 @@ async def list_sessions( ) has_next_page = ( - effective_page_size is not None - and len(session_rows) > effective_page_size + pagination is not None + and len(session_rows) > pagination.effective_page_size ) if has_next_page: - session_rows = session_rows[:effective_page_size] + session_rows = session_rows[: pagination.effective_page_size] # Fetch app state app_state = await self._get_app_state(db, app_name) @@ -381,7 +378,7 @@ async def list_sessions( ) next_page_token = ( - _encode_page_token(offset + effective_page_size) + _encode_page_token(pagination.offset + pagination.effective_page_size) if has_next_page else None ) diff --git a/src/google/adk/sessions/vertex_ai_session_service.py b/src/google/adk/sessions/vertex_ai_session_service.py index 0a83dc5057..71b30c23df 100644 --- a/src/google/adk/sessions/vertex_ai_session_service.py +++ b/src/google/adk/sessions/vertex_ai_session_service.py @@ -38,6 +38,7 @@ from .base_session_service import BaseSessionService from .base_session_service import GetSessionConfig from .base_session_service import ListSessionsResponse +from .base_session_service import SessionPagination from .session import Session logger = logging.getLogger('google_adk.' + __name__) @@ -220,8 +221,7 @@ async def list_sessions( *, app_name: str, user_id: Optional[str] = None, - page_size: Optional[int] = None, - page_token: Optional[str] = None, + pagination: Optional[SessionPagination] = None, ) -> ListSessionsResponse: reasoning_engine_id = self._get_reasoning_engine_id(app_name) diff --git a/tests/unittests/sessions/test_session_service.py b/tests/unittests/sessions/test_session_service.py index c308f05778..810411c61c 100644 --- a/tests/unittests/sessions/test_session_service.py +++ b/tests/unittests/sessions/test_session_service.py @@ -25,6 +25,7 @@ from google.adk.events.event_actions import EventActions from google.adk.sessions import database_session_service from google.adk.sessions.base_session_service import GetSessionConfig +from google.adk.sessions.base_session_service import SessionPagination from google.adk.sessions.database_session_service import DatabaseSessionService from google.adk.sessions.in_memory_session_service import InMemorySessionService from google.adk.sessions.sqlite_session_service import SqliteSessionService @@ -1519,7 +1520,9 @@ async def test_list_sessions_custom_page_size(session_service): ) response = await session_service.list_sessions( - app_name=app_name, user_id=user_id, page_size=3 + app_name=app_name, + user_id=user_id, + pagination=SessionPagination(page_size=3), ) assert len(response.sessions) == 3 assert response.next_page_token is not None @@ -1537,7 +1540,9 @@ async def test_list_sessions_page_size_clamped_to_max(session_service): ) response = await session_service.list_sessions( - app_name=app_name, user_id=user_id, page_size=999 + app_name=app_name, + user_id=user_id, + pagination=SessionPagination(page_size=999), ) # Only 5 sessions exist, so all are returned and no next page. assert len(response.sessions) == 5 @@ -1563,8 +1568,9 @@ async def test_list_sessions_iterate_all_pages(session_service): response = await session_service.list_sessions( app_name=app_name, user_id=user_id, - page_size=page_size, - page_token=page_token, + pagination=SessionPagination( + page_size=page_size, page_token=page_token + ), ) collected_ids.extend(s.id for s in response.sessions) if response.next_page_token is None: @@ -1586,7 +1592,9 @@ async def test_list_sessions_no_next_token_when_exact_fit(session_service): ) response = await session_service.list_sessions( - app_name=app_name, user_id=user_id, page_size=5 + app_name=app_name, + user_id=user_id, + pagination=SessionPagination(page_size=5), ) assert len(response.sessions) == 5 assert response.next_page_token is None @@ -1604,7 +1612,7 @@ async def test_list_sessions_empty_result(session_service): @pytest.mark.asyncio async def test_list_sessions_backward_compatible_no_args(session_service): - """Calling list_sessions without pagination args still works (backward compat).""" + """Calling list_sessions without pagination returns all sessions.""" app_name = 'compat_app' user_id = 'user' @@ -1637,8 +1645,65 @@ async def test_list_sessions_ordered_by_update_time_desc(session_service): ) response = await session_service.list_sessions( - app_name=app_name, user_id=user_id, page_size=100 + app_name=app_name, + user_id=user_id, + pagination=SessionPagination(page_size=100), ) ids = [s.id for s in response.sessions] # Most recently created (and thus updated) should come first. assert ids == ['s2', 's1', 's0'] + + +@pytest.mark.asyncio +async def test_list_sessions_page_offset_skips_to_position(session_service): + """page_offset allows jumping directly to a specific position.""" + app_name = 'offset_app' + user_id = 'user' + + for i in range(10): + await session_service.create_session( + app_name=app_name, user_id=user_id, session_id=f's{i}' + ) + + # Skip the first 5 sessions, get next 3 + response = await session_service.list_sessions( + app_name=app_name, + user_id=user_id, + pagination=SessionPagination(page_size=3, page_offset=5), + ) + assert len(response.sessions) == 3 + assert response.next_page_token is not None + + +@pytest.mark.asyncio +async def test_list_sessions_page_offset_overrides_page_token(session_service): + """page_offset takes precedence over page_token.""" + app_name = 'offset_override_app' + user_id = 'user' + + for i in range(10): + await session_service.create_session( + app_name=app_name, user_id=user_id, session_id=f's{i}' + ) + + # Get first page to obtain a page_token + first_response = await session_service.list_sessions( + app_name=app_name, + user_id=user_id, + pagination=SessionPagination(page_size=3), + ) + token = first_response.next_page_token + assert token is not None + + # Pass both page_token and page_offset — offset should win + response = await session_service.list_sessions( + app_name=app_name, + user_id=user_id, + pagination=SessionPagination( + page_size=3, page_token=token, page_offset=0 + ), + ) + # Should get the same first page as if starting from 0 + assert [s.id for s in response.sessions] == [ + s.id for s in first_response.sessions + ] From 62bb03409e98c41b62d25beb97b1e74c1425feef Mon Sep 17 00:00:00 2001 From: ferponse Date: Mon, 23 Mar 2026 11:23:42 +0100 Subject: [PATCH 5/5] fix(sessions): add explicit None checks for mypy type narrowing Add redundant `pagination is not None` checks alongside `has_next_page` so mypy can narrow the type correctly. --- src/google/adk/sessions/database_session_service.py | 4 ++-- src/google/adk/sessions/sqlite_session_service.py | 4 ++-- 2 files changed, 4 insertions(+), 4 deletions(-) diff --git a/src/google/adk/sessions/database_session_service.py b/src/google/adk/sessions/database_session_service.py index 8f40d503ef..cdd2879ad3 100644 --- a/src/google/adk/sessions/database_session_service.py +++ b/src/google/adk/sessions/database_session_service.py @@ -540,7 +540,7 @@ async def list_sessions( pagination is not None and len(results) > pagination.effective_page_size ) - if has_next_page: + if has_next_page and pagination is not None: results = results[: pagination.effective_page_size] # Fetch app state from storage @@ -578,7 +578,7 @@ async def list_sessions( next_page_token = ( _encode_page_token(pagination.offset + pagination.effective_page_size) - if has_next_page + if has_next_page and pagination is not None else None ) return ListSessionsResponse( diff --git a/src/google/adk/sessions/sqlite_session_service.py b/src/google/adk/sessions/sqlite_session_service.py index a0dc8b030b..6e31c876de 100644 --- a/src/google/adk/sessions/sqlite_session_service.py +++ b/src/google/adk/sessions/sqlite_session_service.py @@ -340,7 +340,7 @@ async def list_sessions( pagination is not None and len(session_rows) > pagination.effective_page_size ) - if has_next_page: + if has_next_page and pagination is not None: session_rows = session_rows[: pagination.effective_page_size] # Fetch app state @@ -379,7 +379,7 @@ async def list_sessions( next_page_token = ( _encode_page_token(pagination.offset + pagination.effective_page_size) - if has_next_page + if has_next_page and pagination is not None else None ) return ListSessionsResponse(