44
55import json
66import re
7- from collections .abc import Callable , Mapping
7+ from collections .abc import Callable , Iterator , Mapping , Sequence
88from dataclasses import dataclass , field
99from pathlib import Path
1010from typing import cast
113113class GraphQLError (RuntimeError ):
114114 """Raised for GraphQL transport or application errors."""
115115
116+ def __init__ (
117+ self ,
118+ message : str ,
119+ * ,
120+ status_code : int | None = None ,
121+ is_application_error : bool = False ,
122+ ) -> None :
123+ super ().__init__ (message )
124+ self .status_code = status_code
125+ self .is_application_error = is_application_error
126+
116127
117128@dataclass
118129class GraphQLClient :
@@ -174,6 +185,49 @@ def execute_next_page(next_variables: JSONDict) -> JSONDict:
174185 )
175186 return data
176187
188+ def stream_connection_nodes (
189+ self ,
190+ query : str ,
191+ variables : Mapping [str , JSONValue ] | None = None ,
192+ * ,
193+ connection_path : Sequence [str ],
194+ page_size : int | None = None ,
195+ first_variable : str = "first" ,
196+ after_variable : str = "after" ,
197+ ) -> Iterator [JSONDict ]:
198+ """Stream one GraphQL connection's nodes page by page.
199+
200+ `connection_path` is the response path to the connection object that
201+ contains `nodes` and `pageInfo`, for example `("viewer", "items")`.
202+ Unlike `execute(..., follow_pages=True)`, this does not accumulate all
203+ nodes in memory before returning.
204+ """
205+ page_number = 1
206+
207+ def execute_page (
208+ operation : str , page_variables : Mapping [str , JSONValue ] | None
209+ ) -> JSONDict :
210+ nonlocal page_number
211+ data = self ._execute_once (
212+ operation ,
213+ dict (page_variables or {}),
214+ page_number = page_number ,
215+ first_variable = first_variable ,
216+ after_variable = after_variable ,
217+ )
218+ page_number += 1
219+ return data
220+
221+ yield from stream_connection_nodes (
222+ execute_page ,
223+ query ,
224+ variables ,
225+ connection_path = connection_path ,
226+ page_size = page_size ,
227+ first_variable = first_variable ,
228+ after_variable = after_variable ,
229+ )
230+
177231 def _execute_once (
178232 self ,
179233 query : str ,
@@ -200,15 +254,19 @@ def _execute_once(
200254 payload = self .http .json ("POST" , self .url , headers = self .headers , json_body = body )
201255 except HTTPClientError as exception :
202256 raise GraphQLError (
203- f"{ self .label } GraphQL request failed: { exception } "
257+ f"{ self .label } GraphQL request failed: { exception } " ,
258+ status_code = exception .status_code ,
204259 ) from exception
205260 errors = payload .get ("errors" )
206261 data = json_dict (payload .get ("data" ))
207262 fields ["response_fields" ] = sorted (data )
208263 if errors :
209264 fields ["graphql_errors" ] = len (errors ) if isinstance (errors , list ) else 1
210265 if errors and not (self .tolerate_partial_errors and data ):
211- raise GraphQLError (f"{ self .label } GraphQL errors: { errors } " )
266+ raise GraphQLError (
267+ f"{ self .label } GraphQL errors: { errors } " ,
268+ is_application_error = True ,
269+ )
212270 return data
213271
214272
@@ -218,6 +276,49 @@ def operation_name(query: str) -> str:
218276 return match .group (1 ) if match else "anonymous"
219277
220278
279+ def stream_connection_nodes (
280+ execute : Callable [[str , Mapping [str , JSONValue ] | None ], JSONDict ],
281+ query : str ,
282+ variables : Mapping [str , JSONValue ] | None = None ,
283+ * ,
284+ connection_path : Sequence [str ],
285+ page_size : int | None = None ,
286+ first_variable : str = "first" ,
287+ after_variable : str = "after" ,
288+ ) -> Iterator [JSONDict ]:
289+ """Stream one GraphQL connection's nodes through any execute callable."""
290+ page_variables : JSONDict = dict (variables ) if variables is not None else {}
291+ if page_size is not None :
292+ page_variables [first_variable ] = page_size
293+ query_uses_after_variable = _query_uses_variable (query , after_variable )
294+ if query_uses_after_variable and after_variable not in page_variables :
295+ page_variables [after_variable ] = None
296+
297+ path = tuple (connection_path )
298+ current_cursor = page_variables .get (after_variable )
299+ while True :
300+ data = execute (query , dict (page_variables ))
301+ page = _node_page_at_path (data , path )
302+ for node in json_list (page .get ("nodes" )):
303+ yield json_dict (node )
304+
305+ page_info = json_dict (page .get ("pageInfo" ))
306+ has_next_page = page_info .get ("hasNextPage" )
307+ if not isinstance (has_next_page , bool ):
308+ raise GraphQLError (
309+ f"GraphQL pagination path { _path_label (path )} missing pageInfo.hasNextPage"
310+ )
311+ if not has_next_page :
312+ return
313+ if not query_uses_after_variable :
314+ raise GraphQLError (
315+ f"GraphQL query returned more pages but does not use ${ after_variable } "
316+ )
317+ next_cursor = _next_page_cursor (page_info , path , current_cursor )
318+ page_variables [after_variable ] = next_cursor
319+ current_cursor = next_cursor
320+
321+
221322def _int_variable (variables : JSONDict , name : str ) -> int | None :
222323 value = variables .get (name )
223324 return value if isinstance (value , int ) else None
@@ -301,9 +402,7 @@ def _fetch_remaining_pages(
301402 target_page = _node_page_at_path (data , path )
302403 target_nodes = json_list (target_page .get ("nodes" ))
303404 page_info = json_dict (target_page .get ("pageInfo" ))
304- after = json_str (page_info , "endCursor" )
305- if not after :
306- raise GraphQLError (f"GraphQL pagination path { '.' .join (path )} missing pageInfo.endCursor" )
405+ after = _next_page_cursor (page_info , path , variables .get (after_variable ))
307406
308407 while after :
309408 page_variables = dict (variables )
@@ -322,11 +421,7 @@ def _fetch_remaining_pages(
322421 )
323422 if not has_next_page :
324423 return
325- after = json_str (next_page_info , "endCursor" )
326- if not after :
327- raise GraphQLError (
328- f"GraphQL pagination path { '.' .join (path )} missing pageInfo.endCursor"
329- )
424+ after = _next_page_cursor (next_page_info , path , after )
330425
331426
332427def _next_page_paths (data : JSONDict ) -> list [tuple [str , ...]]:
@@ -355,10 +450,27 @@ def _node_page_at_path(data: JSONDict, path: tuple[str, ...]) -> JSONDict:
355450 current = json_dict (current ).get (key )
356451 page = json_dict (current )
357452 if not page :
358- label = "." .join (path ) or "<root>"
359- raise GraphQLError (f"GraphQL response did not include pagination path { label } " )
453+ raise GraphQLError (f"GraphQL response did not include pagination path { _path_label (path )} " )
360454 return page
361455
362456
457+ def _next_page_cursor (page_info : JSONDict , path : tuple [str , ...], current_cursor : object ) -> str :
458+ next_cursor = json_str (page_info , "endCursor" )
459+ if not next_cursor :
460+ raise GraphQLError (
461+ f"GraphQL pagination path { _path_label (path )} missing pageInfo.endCursor"
462+ )
463+ if isinstance (current_cursor , str ) and next_cursor == current_cursor :
464+ raise GraphQLError (
465+ f"GraphQL pagination path { _path_label (path )} stalled: "
466+ f"pageInfo.endCursor did not advance from { current_cursor !r} "
467+ )
468+ return next_cursor
469+
470+
471+ def _path_label (path : tuple [str , ...]) -> str :
472+ return "." .join (path ) or "<root>"
473+
474+
363475def _query_uses_variable (query : str , variable : str ) -> bool :
364476 return re .search (rf"\${ re .escape (variable )} \b" , query ) is not None
0 commit comments