From 20865912e9f8c538642747b0fb024d7f9dc3d403 Mon Sep 17 00:00:00 2001 From: Scott Kruyswyk Date: Wed, 17 Dec 2025 10:57:01 -0700 Subject: [PATCH 1/8] use IQS with http instead of witchcraft service --- .../client/internal_query_client.py | 30 ++----------------- 1 file changed, 3 insertions(+), 27 deletions(-) diff --git a/compute_modules/client/internal_query_client.py b/compute_modules/client/internal_query_client.py index 5cece41..8dfefa4 100644 --- a/compute_modules/client/internal_query_client.py +++ b/compute_modules/client/internal_query_client.py @@ -15,7 +15,6 @@ import json import os -import ssl import time import traceback from typing import Any, Callable, Dict, Iterable, List @@ -58,15 +57,12 @@ def __init__( self.is_function_context_typed = is_function_context_typed self.streaming = streaming self.host = os.environ["RUNTIME_HOST"] - self.port = int(os.environ["RUNTIME_PORT"]) + self.port = int(os.environ["RUNTIME_PORT_V2"]) self.get_job_path = _extract_path_from_url(os.environ["GET_JOB_URI"]) self.post_result_path = _extract_path_from_url(os.environ["POST_RESULT_URI"]) self.post_schema_path = _extract_path_from_url(os.environ["POST_SCHEMA_URI"]) self.post_restart_path = _extract_path_from_url(os.environ["RESTART_NOTIFICATION_URI"]) - self._initialize_auth_token() self._initialize_headers() - self.certPath = os.environ["CONNECTIONS_TO_OTHER_PODS_CA_PATH"] - self.context = ssl.create_default_context(cafile=self.certPath) self.connection_refused_count: int = 0 self.concurrency = int(os.environ.get("MAX_CONCURRENT_TASKS", 1)) self.logger = get_internal_logger() @@ -84,22 +80,9 @@ def _set_logger_process_id(self, process_id: int) -> None: """Set the process_id for internal & public logger""" COMPUTE_MODULES_ADAPTER_MANAGER.update_process_id(process_id=process_id) - def _initialize_auth_token(self) -> None: - try: - with open(os.environ["MODULE_AUTH_TOKEN"], "r", encoding="utf-8") as f: - self.moduleAuthToken = f.read() - except Exception as e: - self.logger.error(f"Failed to read auth token: {str(e)}") - raise - def _initialize_headers(self) -> None: - self.get_job_headers = {"Module-Auth-Token": self.moduleAuthToken} - self.post_result_headers = { - "Content-Type": "application/octet-stream", - "Module-Auth-Token": self.moduleAuthToken, - } - self.post_schema_headers = {"Content-Type": "application/json", "Module-Auth-Token": self.moduleAuthToken} - self.post_restart_headers = {"Module-Auth-Token": self.moduleAuthToken} + self.post_result_headers = {"Content-Type": "application/octet-stream"} + self.post_schema_headers = {"Content-Type": "application/json"} def _iterable_to_json_generator(self, iterable: Iterable[Any]) -> Iterable[bytes]: self.logger.debug("iterating over result") @@ -124,7 +107,6 @@ def post_query_schemas(self) -> None: url=self.build_url(self.post_schema_path), json=self.function_schemas, headers=self.post_schema_headers, - verify=self.certPath, ) as response: self.logger.debug( f"POST /schemas response status: {response.status_code} reason: {response.reason}" @@ -144,8 +126,6 @@ def get_job_or_none(self) -> Any: with self.session.request( method="GET", url=self.build_url(self.get_job_path), - headers=self.get_job_headers, - verify=self.certPath, ) as response: result = None if response.status_code == 200: @@ -174,7 +154,6 @@ def report_job_result_failed(self, post_result_url: str, error: str) -> None: url=post_result_url, headers=self.post_result_headers, data=json.dumps({"error": error}).encode("utf-8"), - verify=self.certPath, ) as response: if response.status_code == 204: self.logger.debug("Successfully reported that job result posting has failed") @@ -199,7 +178,6 @@ def report_job_result(self, job_id: str, body: Any) -> None: url=post_result_url, headers=self.post_result_headers, data=body, - verify=self.certPath, ) as response: if response.status_code == 204: self.logger.debug("Successfully reported job result") @@ -295,8 +273,6 @@ def report_restart(self) -> None: with self.session.request( method="POST", url=post_restart_url, - headers=self.post_restart_headers, - verify=self.certPath, ) as response: self.logger.debug( f"Reporting restart response status: {response.status_code} reason: {response.reason}" From c8a52e478381e370c8a689e2007e5670ab7cdcbc Mon Sep 17 00:00:00 2001 From: svc-autorelease Date: Wed, 17 Dec 2025 18:14:35 +0000 Subject: [PATCH 2/8] Release 0.29.0-rc1 [skip ci] From 9b9c1a41a531b06dc1ab7f00441f8ca0c1d99185 Mon Sep 17 00:00:00 2001 From: Scott Kruyswyk Date: Wed, 17 Dec 2025 11:34:04 -0700 Subject: [PATCH 3/8] ope --- compute_modules/client/internal_query_client.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/compute_modules/client/internal_query_client.py b/compute_modules/client/internal_query_client.py index 8dfefa4..b24e03e 100644 --- a/compute_modules/client/internal_query_client.py +++ b/compute_modules/client/internal_query_client.py @@ -95,7 +95,7 @@ def init_session(self) -> None: self.session = requests.Session() def build_url(self, path: str) -> str: - return f"https://{self.host}:{self.port}{path}" + return f"http://{self.host}:{self.port}{path}" def post_query_schemas(self) -> None: """Post the function schemas of the Compute Module""" From 9618014d0a88efe51dae7723cde91415e82e90f3 Mon Sep 17 00:00:00 2001 From: svc-autorelease Date: Wed, 17 Dec 2025 18:38:00 +0000 Subject: [PATCH 4/8] Release 0.29.0-rc2 [skip ci] From 9c2945f223e834cf238c1e963293fa5dd255f19f Mon Sep 17 00:00:00 2001 From: Scott Kruyswyk Date: Wed, 17 Dec 2025 11:57:28 -0700 Subject: [PATCH 5/8] fix more dumb stuff --- .../client/internal_query_client.py | 32 ++++++------------- 1 file changed, 10 insertions(+), 22 deletions(-) diff --git a/compute_modules/client/internal_query_client.py b/compute_modules/client/internal_query_client.py index b24e03e..7ee8e1b 100644 --- a/compute_modules/client/internal_query_client.py +++ b/compute_modules/client/internal_query_client.py @@ -18,7 +18,6 @@ import time import traceback from typing import Any, Callable, Dict, Iterable, List -from urllib.parse import urlparse import requests @@ -37,11 +36,6 @@ POST_RESTART_MAX_ATTEMPTS = 5 -def _extract_path_from_url(url: str) -> str: - parsed_url = urlparse(url) - return parsed_url.path - - class InternalQueryService: def __init__( self, @@ -56,12 +50,10 @@ def __init__( self.function_schema_conversions = function_schema_conversions self.is_function_context_typed = is_function_context_typed self.streaming = streaming - self.host = os.environ["RUNTIME_HOST"] - self.port = int(os.environ["RUNTIME_PORT_V2"]) - self.get_job_path = _extract_path_from_url(os.environ["GET_JOB_URI"]) - self.post_result_path = _extract_path_from_url(os.environ["POST_RESULT_URI"]) - self.post_schema_path = _extract_path_from_url(os.environ["POST_SCHEMA_URI"]) - self.post_restart_path = _extract_path_from_url(os.environ["RESTART_NOTIFICATION_URI"]) + self.get_job_url = os.environ["GET_JOB_URI_V2"] + self.post_result_url = os.environ["POST_RESULT_URI_V2"] + self.post_schema_url = os.environ["POST_SCHEMA_URI_V2"] + self.post_restart_url = os.environ["RESTART_NOTIFICATION_URI_V2"] self._initialize_headers() self.connection_refused_count: int = 0 self.concurrency = int(os.environ.get("MAX_CONCURRENT_TASKS", 1)) @@ -94,17 +86,15 @@ def init_session(self) -> None: """Initialize requests.Session""" self.session = requests.Session() - def build_url(self, path: str) -> str: - return f"http://{self.host}:{self.port}{path}" - def post_query_schemas(self) -> None: """Post the function schemas of the Compute Module""" self.logger.debug(f"Posting function schemas: {self.function_schemas}") + self.logger.debug(f"post_schema_url: {self.post_schema_url}") for i in range(POST_SCHEMAS_MAX_ATTEMPTS): try: with self.session.request( method="POST", - url=self.build_url(self.post_schema_path), + url=self.post_schema_url, json=self.function_schemas, headers=self.post_schema_headers, ) as response: @@ -125,7 +115,7 @@ def get_job_or_none(self) -> Any: try: with self.session.request( method="GET", - url=self.build_url(self.get_job_path), + url=self.get_job_url, ) as response: result = None if response.status_code == 200: @@ -168,8 +158,7 @@ def report_job_result_failed(self, post_result_url: str, error: str) -> None: raise RuntimeError(f"Unable to report that post result has failed after {POST_ERROR_MAX_ATTEMPTS} attempts") def report_job_result(self, job_id: str, body: Any) -> None: - post_result_path = f"{self.post_result_path}/{job_id}" - post_result_url = self.build_url(post_result_path) + post_result_url = f"{self.post_result_url}/{job_id}" self.logger.debug(f"Posting result to {post_result_url}") for _ in range(POST_RESULT_MAX_ATTEMPTS): try: @@ -265,14 +254,13 @@ def get_failed_query(exception: Exception) -> Dict[str, str]: return {"exception": f"{str(exception)}: {traceback.format_exc()}"} def report_restart(self) -> None: - post_restart_url = self.build_url(self.post_restart_path) - self.logger.debug(f"Reporting restart to {post_restart_url}") + self.logger.debug(f"Reporting restart to {self.post_restart_url}") for _ in range(POST_RESTART_MAX_ATTEMPTS): try: with self.session.request( method="POST", - url=post_restart_url, + url=self.post_restart_url, ) as response: self.logger.debug( f"Reporting restart response status: {response.status_code} reason: {response.reason}" From 16b29c1ebe5387789808b63fa43e278dad8d0372 Mon Sep 17 00:00:00 2001 From: Scott Kruyswyk Date: Wed, 17 Dec 2025 12:17:35 -0700 Subject: [PATCH 6/8] annoying --- compute_modules/client/internal_query_client.py | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/compute_modules/client/internal_query_client.py b/compute_modules/client/internal_query_client.py index 7ee8e1b..1bb6825 100644 --- a/compute_modules/client/internal_query_client.py +++ b/compute_modules/client/internal_query_client.py @@ -53,7 +53,8 @@ def __init__( self.get_job_url = os.environ["GET_JOB_URI_V2"] self.post_result_url = os.environ["POST_RESULT_URI_V2"] self.post_schema_url = os.environ["POST_SCHEMA_URI_V2"] - self.post_restart_url = os.environ["RESTART_NOTIFICATION_URI_V2"] + # self.post_restart_url = os.environ["RESTART_NOTIFICATION_URI_V2"] + self.post_restart_url = f"http://{os.environ['RUNTIME_API_V2']}/restart-notify" self._initialize_headers() self.connection_refused_count: int = 0 self.concurrency = int(os.environ.get("MAX_CONCURRENT_TASKS", 1)) From 248813756d8e3042bca04dd8805e7e64c562a71e Mon Sep 17 00:00:00 2001 From: Scott Kruyswyk Date: Wed, 17 Dec 2025 14:17:20 -0700 Subject: [PATCH 7/8] bugs --- compute_modules/client/internal_query_client.py | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/compute_modules/client/internal_query_client.py b/compute_modules/client/internal_query_client.py index 1bb6825..cb2f5f0 100644 --- a/compute_modules/client/internal_query_client.py +++ b/compute_modules/client/internal_query_client.py @@ -54,7 +54,7 @@ def __init__( self.post_result_url = os.environ["POST_RESULT_URI_V2"] self.post_schema_url = os.environ["POST_SCHEMA_URI_V2"] # self.post_restart_url = os.environ["RESTART_NOTIFICATION_URI_V2"] - self.post_restart_url = f"http://{os.environ['RUNTIME_API_V2']}/restart-notify" + self.post_restart_url = f"{os.environ['RUNTIME_API_V2']}/restart-notify" self._initialize_headers() self.connection_refused_count: int = 0 self.concurrency = int(os.environ.get("MAX_CONCURRENT_TASKS", 1)) @@ -169,7 +169,8 @@ def report_job_result(self, job_id: str, body: Any) -> None: headers=self.post_result_headers, data=body, ) as response: - if response.status_code == 204: + # HTTP version returns 202 while witchcraft returns 204 + if response.status_code in (202, 204): self.logger.debug("Successfully reported job result") return else: From b7b3081b372cba1abdd745b16f3e164da14498c3 Mon Sep 17 00:00:00 2001 From: Scott Kruyswyk Date: Wed, 17 Dec 2025 14:55:30 -0700 Subject: [PATCH 8/8] another one --- compute_modules/client/internal_query_client.py | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/compute_modules/client/internal_query_client.py b/compute_modules/client/internal_query_client.py index cb2f5f0..fafa05c 100644 --- a/compute_modules/client/internal_query_client.py +++ b/compute_modules/client/internal_query_client.py @@ -146,7 +146,8 @@ def report_job_result_failed(self, post_result_url: str, error: str) -> None: headers=self.post_result_headers, data=json.dumps({"error": error}).encode("utf-8"), ) as response: - if response.status_code == 204: + # HTTP version returns 202 while witchcraft returns 204 + if response.status_code in (202, 204): self.logger.debug("Successfully reported that job result posting has failed") return else: