-
Notifications
You must be signed in to change notification settings - Fork 200
added heartbeat interval to check if query is still fetching data #588
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: master
Are you sure you want to change the base?
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change | ||||
|---|---|---|---|---|---|---|
|
|
@@ -660,6 +660,15 @@ def get(self, url: str) -> Response: | |||||
| def delete(self, url: str) -> Response: | ||||||
| return self._delete(url, timeout=self._request_timeout, proxies=PROXIES) | ||||||
|
|
||||||
| def send_heartbeat(self, url: str) -> Response: | ||||||
| """Send HEAD request for query heartbeat.""" | ||||||
| return self._http_session.head( | ||||||
| url, | ||||||
| headers=self.http_headers, | ||||||
| timeout=self._request_timeout, | ||||||
| proxies=PROXIES, | ||||||
| ) | ||||||
|
|
||||||
| @staticmethod | ||||||
| def _process_error(error, query_id: Optional[str]) -> Union[TrinoExternalError, TrinoQueryError, TrinoUserError]: | ||||||
| error_type = error["errorType"] | ||||||
|
|
@@ -817,7 +826,8 @@ def __init__( | |||||
| request: TrinoRequest, | ||||||
| query: str, | ||||||
| legacy_primitive_types: bool = False, | ||||||
| fetch_mode: Literal["mapped", "segments"] = "mapped" | ||||||
| fetch_mode: Literal["mapped", "segments"] = "mapped", | ||||||
| heartbeat_interval_seconds: Optional[float] = None, | ||||||
| ) -> None: | ||||||
| self._query_id: Optional[str] = None | ||||||
| self._stats: Dict[Any, Any] = {} | ||||||
|
|
@@ -835,6 +845,10 @@ def __init__( | |||||
| self._legacy_primitive_types = legacy_primitive_types | ||||||
| self._row_mapper: Optional[RowMapper] = None | ||||||
| self._fetch_mode = fetch_mode | ||||||
| self._heartbeat_interval_seconds = heartbeat_interval_seconds | ||||||
| self._heartbeat_enabled = False | ||||||
| self._heartbeat_thread: Optional[threading.Thread] = None | ||||||
| self._heartbeat_failures = 0 | ||||||
|
|
||||||
| @property | ||||||
| def query_id(self) -> Optional[str]: | ||||||
|
|
@@ -904,6 +918,10 @@ def execute(self, additional_http_headers=None) -> TrinoResult: | |||||
| rows = self._row_mapper.map(status.rows) if self._row_mapper else status.rows | ||||||
| self._result = TrinoResult(self, rows) | ||||||
|
|
||||||
| # Start heartbeat if interval is set and next_uri is available | ||||||
| if self._heartbeat_interval_seconds is not None and self._next_uri is not None: | ||||||
|
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
Suggested change
isnt this the same? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. below as well |
||||||
| self._start_heartbeat() | ||||||
|
|
||||||
| # Execute should block until at least one row is received or query is finished or cancelled | ||||||
| while not self.finished and not self.cancelled and len(self._result.rows) == 0: | ||||||
| self._result.rows += self.fetch() | ||||||
|
|
@@ -928,8 +946,15 @@ def fetch(self) -> List[Union[List[Any]], Any]: | |||||
| raise trino.exceptions.TrinoConnectionError("failed to fetch: {}".format(e)) | ||||||
| status = self._request.process(response) | ||||||
| self._update_state(status) | ||||||
|
|
||||||
| # Start heartbeat if interval is set and next_uri is now available | ||||||
| if self._heartbeat_interval_seconds is not None and self._next_uri is not None and not self._heartbeat_enabled: | ||||||
| self._start_heartbeat() | ||||||
|
|
||||||
| if status.next_uri is None: | ||||||
| self._finished = True | ||||||
| # Stop heartbeat when query finishes | ||||||
| self._stop_heartbeat() | ||||||
|
|
||||||
| if not self._row_mapper: | ||||||
| return [] | ||||||
|
|
@@ -969,6 +994,9 @@ def cancel(self) -> None: | |||||
| if self._next_uri is None: | ||||||
| return | ||||||
|
|
||||||
| # Stop heartbeat when query is cancelled | ||||||
| self._stop_heartbeat() | ||||||
|
|
||||||
| logger.debug("cancelling query: %s", self.query_id) | ||||||
| try: | ||||||
| response = self._request.delete(self._next_uri) | ||||||
|
|
@@ -981,6 +1009,66 @@ def cancel(self) -> None: | |||||
|
|
||||||
| self._request.raise_response_error(response) | ||||||
|
|
||||||
| def _start_heartbeat(self) -> None: | ||||||
| """Start sending periodic heartbeat requests.""" | ||||||
| if self._heartbeat_interval_seconds is None or self._heartbeat_interval_seconds <= 0: | ||||||
| return | ||||||
|
|
||||||
| if self._heartbeat_enabled: | ||||||
| return | ||||||
|
|
||||||
| if self._next_uri is None: | ||||||
| return | ||||||
|
|
||||||
| self._heartbeat_enabled = True | ||||||
| self._heartbeat_failures = 0 | ||||||
|
|
||||||
| def heartbeat_loop(): | ||||||
| while self._heartbeat_enabled and not self._finished and not self._cancelled: | ||||||
| try: | ||||||
| if self._next_uri is None: | ||||||
| break | ||||||
|
|
||||||
| response = self._request.send_heartbeat(self._next_uri) | ||||||
| status_code = response.status_code | ||||||
|
|
||||||
| # Stop heartbeat on 404 or 405 (query not found or method not allowed) | ||||||
| if status_code in (404, 405): | ||||||
| self._heartbeat_enabled = False | ||||||
| break | ||||||
|
|
||||||
| # Reset failure count on success | ||||||
| if status_code == 200: | ||||||
|
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. why only 200 and not all 200<=x<300 values? |
||||||
| self._heartbeat_failures = 0 | ||||||
| else: | ||||||
| self._heartbeat_failures += 1 | ||||||
| # Stop after 3 consecutive failures | ||||||
| if self._heartbeat_failures >= 3: | ||||||
|
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. You drop heartbeat silently after 3 failures. User has no idea. |
||||||
| self._heartbeat_enabled = False | ||||||
| break | ||||||
|
|
||||||
| except Exception: | ||||||
| # On any exception, increment failure count | ||||||
| self._heartbeat_failures += 1 | ||||||
| if self._heartbeat_failures >= 3: | ||||||
| self._heartbeat_enabled = False | ||||||
| break | ||||||
|
|
||||||
| # Sleep for the heartbeat interval | ||||||
| sleep(self._heartbeat_interval_seconds) | ||||||
|
|
||||||
| self._heartbeat_thread = threading.Thread(target=heartbeat_loop, daemon=True) | ||||||
| self._heartbeat_thread.start() | ||||||
|
|
||||||
| def _stop_heartbeat(self) -> None: | ||||||
| """Stop sending heartbeat requests.""" | ||||||
| if not self._heartbeat_enabled: | ||||||
| return | ||||||
|
|
||||||
| self._heartbeat_enabled = False | ||||||
| if self._heartbeat_thread is not None: | ||||||
| self._heartbeat_thread.join(timeout=1.0) | ||||||
|
|
||||||
| def is_finished(self) -> bool: | ||||||
| import warnings | ||||||
| warnings.warn("is_finished is deprecated, use finished instead", DeprecationWarning) | ||||||
|
|
||||||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.