From b4090c81c8e06486d0ba0d01c1b443fcc5139e8a Mon Sep 17 00:00:00 2001 From: Mateus Silva Date: Fri, 14 Nov 2025 11:04:53 -0300 Subject: [PATCH 1/7] feat: enhance Analyses class with snippet methods and improved documentation Added listSnippets() and getSnippetFile() methods to fetch analysis code examples from TagoIO's public repository. Enhanced all existing methods with comprehensive docstrings following the Account class pattern, including descriptions, references, and practical examples. Updated type definitions to support new snippet functionality with SnippetRuntime, SnippetItem, and SnippetsListResponse types. Expanded test coverage with 8 new test cases and updated RST documentation with detailed method descriptions and code examples. --- .../Resources/Analysis/Analysis_Type.rst | 65 ++++ docs/source/Resources/Analysis/index.rst | 292 ++++++++++++------ src/tagoio_sdk/modules/Resources/Analyses.py | 272 ++++++++++++++-- .../modules/Resources/Analysis_Types.py | 44 ++- tests/Resources/test_analyses.py | 164 ++++++++-- 5 files changed, 699 insertions(+), 138 deletions(-) diff --git a/docs/source/Resources/Analysis/Analysis_Type.rst b/docs/source/Resources/Analysis/Analysis_Type.rst index d7b7e83..e0fa082 100644 --- a/docs/source/Resources/Analysis/Analysis_Type.rst +++ b/docs/source/Resources/Analysis/Analysis_Type.rst @@ -102,3 +102,68 @@ AnalysisListItem | locked_at: Optional[datetime] | console: Optional[List[str]] + + +.. _SnippetRuntime: + +SnippetRuntime +-------------- + + Available runtime environments for snippets. + + **Type:** + + | Literal["node-legacy", "python-legacy", "node-rt2025", "python-rt2025", "deno-rt2025"] + + +.. _SnippetItem: + +SnippetItem +----------- + + Individual snippet metadata. + + **Attributes:** + + | id: str + | Unique identifier for the snippet + + | title: str + | Human-readable title + + | description: str + | Description of what the snippet does + + | language: str + | Programming language (typescript, javascript, python) + + | tags: List[str] + | Array of tags for categorization + + | filename: str + | Filename of the snippet + + | file_path: str + | Full path to the file in the runtime directory + + +.. _SnippetsListResponse: + +SnippetsListResponse +-------------------- + + API response containing all snippets metadata for a runtime. + + **Attributes:** + + | runtime: :ref:`SnippetRuntime` + | Runtime environment identifier + + | schema_version: int + | Schema version for the API response format + + | generated_at: str + | ISO timestamp when the response was generated + + | snippets: List[:ref:`SnippetItem`] + | Array of all available snippets for this runtime diff --git a/docs/source/Resources/Analysis/index.rst b/docs/source/Resources/Analysis/index.rst index 7086b44..46a95ca 100644 --- a/docs/source/Resources/Analysis/index.rst +++ b/docs/source/Resources/Analysis/index.rst @@ -1,13 +1,16 @@ **Analysis** ============ -Manage analysis in account. +Manage analysis in your application. ======= list ======= -Retrieves a list with all analyses from the account +Lists all analyses from the application with pagination support. +Use this to retrieve and manage analyses in your application. + +See: `Analysis `_ **Parameters:** @@ -17,238 +20,343 @@ Retrieves a list with all analyses from the account .. code-block:: :caption: **Default queryObj:** - queryObj: { + queryObj = { "page": 1, "fields": ["id", "name"], "filter": {}, "amount": 20, - "orderBy": ["name","asc"], + "orderBy": ["name", "asc"] } **Returns:** | list[:ref:`AnalysisListItem`] - .. code-block:: - :caption: **Example:** + .. code-block:: python - from tagoio_sdk import Resources + # If receive an error "Authorization Denied", check policy "Analysis" / "Access" in Access Management. + from tagoio_sdk import Resources - resources = Resources() - resources.analysis.list() + resources = Resources() + list_result = resources.analyses.list({ + "page": 1, + "fields": ["id", "name"], + "amount": 10, + "orderBy": ["name", "asc"] + }) + print(list_result) # [{'id': 'analysis-id-123', 'name': 'Analysis Test', ...}] ======= create ======= -Create a new analysis +Creates a new analysis in your application. + +See: `Creating Analysis `_ **Parameters:** - | **analysisInfo**: :ref:`AnalysisCreateInfo` - | Analysis information + | **analysisObj**: :ref:`AnalysisCreateInfo` + | Data object to create new TagoIO Analysis **Returns:** | Dict[str, GenericID | GenericToken] - .. code-block:: - :caption: **Example:** + .. code-block:: python - from tagoio_sdk import Resources + # If receive an error "Authorization Denied", check policy "Analysis" / "Create" in Access Management. + from tagoio_sdk import Resources - resources = Resources() - resources.analysis.create({ - "name": "My Analysis", - "runtime": "python", - "active": True, - }) + resources = Resources() + new_analysis = resources.analyses.create({ + "name": "My Analysis", + "runtime": "python", + "tags": [{"key": "type", "value": "data-processing"}] + }) + print(new_analysis["id"], new_analysis["token"]) # analysis-id-123, analysis-token-123 ======= edit ======= -Modify any property of the analyze +Modifies an existing analysis. + +See: `Analysis `_ **Parameters:** - | **analysisID**: GenericID: str - | Analysis ID + | **analysisID**: str + | Analysis identification - | **analysisInfo**: :ref:`AnalysisCreateInfo` - | Analysis information + | **analysisObj**: :ref:`AnalysisInfo` + | Analysis object with data to replace **Returns:** | string - .. code-block:: - :caption: **Example:** + .. code-block:: python - from tagoio_sdk import Resources + # If receive an error "Authorization Denied", check policy "Analysis" / "Create" in Access Management. + from tagoio_sdk import Resources - resources = Resources() - resources.analysis.edit("analysisID", { "name": "My Analysis Edited" }) + resources = Resources() + result = resources.analyses.edit("analysis-id-123", { + "name": "Updated Analysis", + "active": False + }) + print(result) # Successfully Updated ======= delete ======= -Deletes an analysis from the account +Deletes an analysis from your application. + +See: `Analysis `_ **Parameters:** - | **analysisID**: GenericID: str - | Analysis ID + | **analysisID**: str + | Analysis identification **Returns:** | string - .. code-block:: - :caption: **Example:** + .. code-block:: python - from tagoio_sdk import Resources + # If receive an error "Authorization Denied", check policy "Analysis" / "Delete" in Access Management. + from tagoio_sdk import Resources - resources = Resources() - resources.analysis.delete("analysisID") + resources = Resources() + result = resources.analyses.delete("analysis-id-123") + print(result) # Successfully Removed ======= info ======= -Gets information about an analysis +Retrieves detailed information about a specific analysis. + +See: `Analysis `_ **Parameters:** - | **analysisID**: GenericID: str - | Analysis ID + | **analysisID**: str + | Analysis identification **Returns:** | :ref:`AnalysisInfo` - .. code-block:: - :caption: **Example:** + .. code-block:: python - from tagoio_sdk import Resources + # If receive an error "Authorization Denied", check policy "Analysis" / "Access" in Access Management. + from tagoio_sdk import Resources - resources = Resources() - resources.analysis.info("analysisID") + resources = Resources() + analysis_info = resources.analyses.info("analysis-id-123") + print(analysis_info) # {'id': 'analysis-id-123', 'name': 'My Analysis', ...} ======= run ======= -Run an analysis +Executes an analysis with optional scope parameters. + +See: `Analysis `_ **Parameters:** - | **analysisID**: GenericID: str - | Analysis ID + | **analysisID**: str + | Analysis identification + + | *Optional* **scopeObj**: Dict[str, Any] + | Simulate scope for analysis **Returns:** | Dict[str, GenericToken] - .. code-block:: - :caption: **Example:** + .. code-block:: python - from tagoio_sdk import Resources + # If receive an error "Authorization Denied", check policy "Analysis" / "Run Analysis" in Access Management. + from tagoio_sdk import Resources - resources = Resources() - resources.analysis.run("analysisID") + resources = Resources() + result = resources.analyses.run("analysis-id-123", {"environment": "production"}) + print(result["analysis_token"]) # analysis-token-123 ============= tokenGenerate ============= -Generate a new token for the analysis +Generates a new token for the analysis. +This is only allowed when the analysis is running in external mode. + +See: `Analysis `_ **Parameters:** - | **analysisID**: GenericID: str - | Analysis ID + | **analysisID**: str + | Analysis identification **Returns:** | Dict[str, str] - .. code-block:: - :caption: **Example:** + .. code-block:: python - from tagoio_sdk import Resources + from tagoio_sdk import Resources - resources = Resources() - resources.analysis.tokenGenerate("analysisID") + resources = Resources() + token = resources.analyses.tokenGenerate("analysis-id-123") + print(token["analysis_token"]) # analysis-token-123 ============ uploadScript ============ -Upload a file (base64) to Analysis. Automatically erase the old one +Uploads a script file to an analysis. +The file content must be base64-encoded. This automatically replaces the old script. + +See: `Analysis `_ **Parameters:** - | **analysisID**: GenericID: str - | Analysis ID + | **analysisID**: str + | Analysis identification - | **file**: :ref:`ScriptFile` - | File information + | **fileObj**: :ref:`ScriptFile` + | Object with name, language and content (base64) of the file **Returns:** | string - .. code-block:: - :caption: **Example:** + .. code-block:: python - from tagoio_sdk import Resources - import base64 + # If receive an error "Authorization Denied", check policy "Analysis" / "Upload Analysis Script" in Access Management. + from tagoio_sdk import Resources - data = "print(Hello, World!)" - encoded_bytes = base64.b64encode(data.encode('utf-8')).decode('utf-8') - - resources = Resources() - resources.analysis.uploadScript("analysisID", { - "name": "My Script", - "content": encoded_bytes, - "language": "python", - }) + resources = Resources() + result = resources.analyses.uploadScript("analysis-id-123", { + "name": "script.py", + "content": "base64-encoded-content", + "language": "python" + }) + print(result) # Successfully Uploaded ============== downloadScript ============== -Get a url to download the analysis. If `version` is specified in `options`, downloads a specific version. +Gets a download URL for the analysis script. +If version is specified in options, downloads a specific version. + +See: `Analysis `_ **Parameters:** - | **analysisID**: GenericID: str - | Analysis ID + | **analysisID**: str + | Analysis identification - | *Optional* **options**: Dict["version", int] - | Options + | *Optional* **options**: Dict[Literal["version"], int] + | Options for the Analysis script to download (e.g., {"version": 1}) **Returns:** - | Dict[str, Any] + | Dict + + .. code-block:: python + + # If receive an error "Authorization Denied", check policy "Analysis" / "Download Analysis Script" in Access Management. + from tagoio_sdk import Resources + + resources = Resources() + download = resources.analyses.downloadScript("analysis-id-123", {"version": 1}) + print(download["url"]) # https://... + print(download["expire_at"]) # 2025-01-13T... + + +============ +listSnippets +============ + +Get all available snippets for a specific runtime environment. +Fetches analysis code snippets from the public TagoIO snippets repository. + +See: `Script Examples `_ + +See: `Script Editor `_ + + **Parameters:** + + | **runtime**: :ref:`SnippetRuntime` + | The runtime environment to get snippets for + + **Returns:** + + | :ref:`SnippetsListResponse` + + .. code-block:: python + + from tagoio_sdk import Resources + + resources = Resources() + deno_snippets = resources.analyses.listSnippets("deno-rt2025") + + # Print all snippet titles + for snippet in deno_snippets["snippets"]: + print(f"{snippet['title']}: {snippet['description']}") + + +============== +getSnippetFile +============== + +Get the raw source code content of a specific snippet file. +Fetches the actual code content from the TagoIO snippets repository. + +See: `Script Examples `_ + +See: `Script Editor `_ + + **Parameters:** + + | **runtime**: :ref:`SnippetRuntime` + | The runtime environment the snippet belongs to + + | **filename**: str + | The filename of the snippet to retrieve + + **Returns:** + + | str + + .. code-block:: python + + from tagoio_sdk import Resources - .. code-block:: - :caption: **Example:** + resources = Resources() - from tagoio_sdk import Resources + # Get TypeScript code for console example + code = resources.analyses.getSnippetFile("deno-rt2025", "console.ts") + print(code) - resources = Resources() - resources.analysis.downloadScript("analysisID") + # Get Python code for data processing + python_code = resources.analyses.getSnippetFile("python-rt2025", "avg-min-max.py") + print(python_code) .. toctree:: diff --git a/src/tagoio_sdk/modules/Resources/Analyses.py b/src/tagoio_sdk/modules/Resources/Analyses.py index af19462..eb471df 100644 --- a/src/tagoio_sdk/modules/Resources/Analyses.py +++ b/src/tagoio_sdk/modules/Resources/Analyses.py @@ -4,6 +4,8 @@ from typing import Literal from typing import Optional +import requests + from tagoio_sdk.common.Common_Type import GenericID from tagoio_sdk.common.Common_Type import GenericToken from tagoio_sdk.common.tagoio_module import TagoIOModule @@ -12,26 +14,42 @@ from tagoio_sdk.modules.Resources.Analysis_Types import AnalysisListItem from tagoio_sdk.modules.Resources.Analysis_Types import AnalysisQuery from tagoio_sdk.modules.Resources.Analysis_Types import ScriptFile +from tagoio_sdk.modules.Resources.Analysis_Types import SnippetRuntime +from tagoio_sdk.modules.Resources.Analysis_Types import SnippetsListResponse from tagoio_sdk.modules.Utils.dateParser import dateParser from tagoio_sdk.modules.Utils.dateParser import dateParserList +# Base URL for TagoIO analysis snippets repository +SNIPPETS_BASE_URL = "https://snippets.tago.io" + + class Analyses(TagoIOModule): def list(self, queryObj: Optional[AnalysisQuery] = None) -> List[AnalysisListItem]: """ - Retrieves a list with all analyses from the account + @description: + Lists all analyses from the application with pagination support. + Use this to retrieve and manage analyses in your application. - :default: + @see: + https://docs.tago.io/docs/tagoio/analysis/ Analysis - queryObj: { + @example: + If receive an error "Authorization Denied", check policy **Analysis** / **Access** in Access Management. + ```python + resources = Resources() + list_result = resources.analyses.list({ "page": 1, "fields": ["id", "name"], - "filter": {}, - "amount": 20, - "orderBy": ["name", "asc"], - } + "amount": 10, + "orderBy": ["name", "asc"] + }) + print(list_result) # [{'id': 'analysis-id-123', 'name': 'Analysis Test', ...}] + ``` - :param AnalysisQuery queryObj: Search query params + :param AnalysisQuery queryObj: Search query params (optional) + :return: List of analysis items matching the query + :rtype: List[AnalysisListItem] """ queryObj = queryObj or {} orderBy = f"{queryObj.get('orderBy', ['name', 'asc'])[0]},{queryObj.get('orderBy', ['name', 'asc'])[1]}" @@ -55,9 +73,27 @@ def list(self, queryObj: Optional[AnalysisQuery] = None) -> List[AnalysisListIte def create(self, analysisObj: AnalysisCreateInfo) -> Dict[str, GenericID | GenericToken]: """ - Create a new analyze + @description: + Creates a new analysis in your application. + + @see: + https://help.tago.io/portal/en/kb/articles/120-creating-analysis Creating Analysis + + @example: + If receive an error "Authorization Denied", check policy **Analysis** / **Create** in Access Management. + ```python + resources = Resources() + new_analysis = resources.analyses.create({ + "name": "My Analysis", + "runtime": "python", + "tags": [{"key": "type", "value": "data-processing"}] + }) + print(new_analysis["id"], new_analysis["token"]) # analysis-id-123, analysis-token-123 + ``` - :param AnalysisCreateInfo analysisObj: Data object to create new TagoIO Analyze + :param AnalysisCreateInfo analysisObj: Data object to create new TagoIO Analysis + :return: Dictionary with the new analysis ID and token + :rtype: Dict[str, GenericID | GenericToken] """ result = self.doRequest( { @@ -70,10 +106,27 @@ def create(self, analysisObj: AnalysisCreateInfo) -> Dict[str, GenericID | Gener def edit(self, analysisID: GenericID, analysisObj: AnalysisInfo) -> str: """ - Modify any property of the analyze + @description: + Modifies an existing analysis. - :param GenericID analysisID: Analyze identification - :param Partial[AnalysisInfo] analysisObj: Analyze Object with data to replace + @see: + https://docs.tago.io/docs/tagoio/analysis/ Analysis + + @example: + If receive an error "Authorization Denied", check policy **Analysis** / **Create** in Access Management. + ```python + resources = Resources() + result = resources.analyses.edit("analysis-id-123", { + "name": "Updated Analysis", + "active": False + }) + print(result) # Successfully Updated + ``` + + :param GenericID analysisID: Analysis identification + :param AnalysisInfo analysisObj: Analysis object with data to replace + :return: Success message + :rtype: str """ result = self.doRequest( { @@ -86,9 +139,23 @@ def edit(self, analysisID: GenericID, analysisObj: AnalysisInfo) -> str: def delete(self, analysisID: GenericID) -> str: """ - Deletes an analyze from the account + @description: + Deletes an analysis from your application. + + @see: + https://docs.tago.io/docs/tagoio/analysis/ Analysis + + @example: + If receive an error "Authorization Denied", check policy **Analysis** / **Delete** in Access Management. + ```python + resources = Resources() + result = resources.analyses.delete("analysis-id-123") + print(result) # Successfully Removed + ``` - :param GenericID analysisID: Analyze identification + :param GenericID analysisID: Analysis identification + :return: Success message + :rtype: str """ result = self.doRequest( { @@ -100,9 +167,23 @@ def delete(self, analysisID: GenericID) -> str: def info(self, analysisID: GenericID) -> AnalysisInfo: """ - Gets information about the analyze + @description: + Retrieves detailed information about a specific analysis. + + @see: + https://docs.tago.io/docs/tagoio/analysis/ Analysis + + @example: + If receive an error "Authorization Denied", check policy **Analysis** / **Access** in Access Management. + ```python + resources = Resources() + analysis_info = resources.analyses.info("analysis-id-123") + print(analysis_info) # {'id': 'analysis-id-123', 'name': 'My Analysis', ...} + ``` - :param GenericID analysisID: Analyze identification + :param GenericID analysisID: Analysis identification + :return: Detailed analysis information + :rtype: AnalysisInfo """ result = self.doRequest( { @@ -115,10 +196,24 @@ def info(self, analysisID: GenericID) -> AnalysisInfo: def run(self, analysisID: GenericID, scopeObj: Optional[Dict[str, Any]] = None) -> Dict[str, GenericToken]: """ - Force analyze to run + @description: + Executes an analysis with optional scope parameters. + + @see: + https://docs.tago.io/docs/tagoio/analysis/ Analysis + + @example: + If receive an error "Authorization Denied", check policy **Analysis** / **Run Analysis** in Access Management. + ```python + resources = Resources() + result = resources.analyses.run("analysis-id-123", {"environment": "production"}) + print(result["analysis_token"]) # analysis-token-123 + ``` - :param GenericID analysisID: Analyze identification + :param GenericID analysisID: Analysis identification :param Optional[Dict[str, Any]] scopeObj: Simulate scope for analysis + :return: Dictionary containing the analysis token + :rtype: Dict[str, GenericToken] """ result = self.doRequest( { @@ -131,9 +226,23 @@ def run(self, analysisID: GenericID, scopeObj: Optional[Dict[str, Any]] = None) def tokenGenerate(self, analysisID: GenericID) -> Dict[str, str]: """ - Generate a new token for the analysis + @description: + Generates a new token for the analysis. + This is only allowed when the analysis is running in external mode. + + @see: + https://docs.tago.io/docs/tagoio/analysis/ Analysis + + @example: + ```python + resources = Resources() + token = resources.analyses.tokenGenerate("analysis-id-123") + print(token["analysis_token"]) # analysis-token-123 + ``` - :param GenericID analysisID: Analyze identification + :param GenericID analysisID: Analysis identification + :return: Dictionary containing the new analysis token + :rtype: Dict[str, str] """ result = self.doRequest( { @@ -145,10 +254,29 @@ def tokenGenerate(self, analysisID: GenericID) -> Dict[str, str]: def uploadScript(self, analysisID: GenericID, fileObj: ScriptFile) -> str: """ - Upload a file (base64) to Analysis. Automatically erase the old one + @description: + Uploads a script file to an analysis. + The file content must be base64-encoded. This automatically replaces the old script. + + @see: + https://docs.tago.io/docs/tagoio/analysis/ Analysis - :param GenericID analysisID: Analyze identification - :param ScriptFile fileObj: Object with name, language and content of the file + @example: + If receive an error "Authorization Denied", check policy **Analysis** / **Upload Analysis Script** in Access Management. + ```python + resources = Resources() + result = resources.analyses.uploadScript("analysis-id-123", { + "name": "script.py", + "content": "base64-encoded-content", + "language": "python" + }) + print(result) # Successfully Uploaded + ``` + + :param GenericID analysisID: Analysis identification + :param ScriptFile fileObj: Object with name, language and content (base64) of the file + :return: Success message + :rtype: str """ result = self.doRequest( { @@ -163,12 +291,32 @@ def uploadScript(self, analysisID: GenericID, fileObj: ScriptFile) -> str: ) return result - def downloadScript(self, analysisID: GenericID, options: Optional[Dict[Literal["version"], int]] = None) -> Dict: + def downloadScript( + self, + analysisID: GenericID, + options: Optional[Dict[Literal["version"], int]] = None, + ) -> Dict: """ - Get a url to download the analysis. If `version` is specified in `options`, downloads a specific version. + @description: + Gets a download URL for the analysis script. + If version is specified in options, downloads a specific version. + + @see: + https://docs.tago.io/docs/tagoio/analysis/ Analysis + + @example: + If receive an error "Authorization Denied", check policy **Analysis** / **Download Analysis Script** in Access Management. + ```python + resources = Resources() + download = resources.analyses.downloadScript("analysis-id-123", {"version": 1}) + print(download["url"]) # https://... + print(download["expire_at"]) # 2025-01-13T... + ``` :param GenericID analysisID: Analysis identification - :param Optional[Dict[str, int]] options: Options for the Analysis script to download + :param Optional[Dict[str, int]] options: Options for the Analysis script to download (e.g., {"version": 1}) + :return: Dictionary with download URL, size information, and expiration date + :rtype: Dict """ version = options.get("version") if options else None @@ -181,3 +329,73 @@ def downloadScript(self, analysisID: GenericID, options: Optional[Dict[Literal[" ) result = dateParser(result, ["expire_at"]) return result + + def listSnippets(self, runtime: SnippetRuntime) -> SnippetsListResponse: + """ + @description: + Get all available snippets for a specific runtime environment. + Fetches analysis code snippets from the public TagoIO snippets repository. + + @see: + https://help.tago.io/portal/en/kb/articles/64-script-examples Script Examples + https://help.tago.io/portal/en/kb/articles/104-script-editor Script Editor + + @example: + ```python + resources = Resources() + deno_snippets = resources.analyses.listSnippets("deno-rt2025") + + # Print all snippet titles + for snippet in deno_snippets["snippets"]: + print(f"{snippet['title']}: {snippet['description']}") + ``` + + :param SnippetRuntime runtime: The runtime environment to get snippets for + :return: Snippets metadata including runtime, schema version, and list of available snippets + :rtype: SnippetsListResponse + """ + url = f"{SNIPPETS_BASE_URL}/{runtime}.json" + + try: + response = requests.get(url, headers={"Accept": "*/*"}, timeout=10) + response.raise_for_status() + return response.json() + except requests.exceptions.RequestException as e: + raise RuntimeError(f"Failed to fetch snippets: {e}") from e + + def getSnippetFile(self, runtime: SnippetRuntime, filename: str) -> str: + """ + @description: + Get the raw source code content of a specific snippet file. + Fetches the actual code content from the TagoIO snippets repository. + + @see: + https://help.tago.io/portal/en/kb/articles/64-script-examples Script Examples + https://help.tago.io/portal/en/kb/articles/104-script-editor Script Editor + + @example: + ```python + resources = Resources() + + # Get TypeScript code for console example + code = resources.analyses.getSnippetFile("deno-rt2025", "console.ts") + print(code) + + # Get Python code for data processing + python_code = resources.analyses.getSnippetFile("python-rt2025", "avg-min-max.py") + print(python_code) + ``` + + :param SnippetRuntime runtime: The runtime environment the snippet belongs to + :param str filename: The filename of the snippet to retrieve + :return: Raw file content as string + :rtype: str + """ + url = f"{SNIPPETS_BASE_URL}/{runtime}/{filename}" + + try: + response = requests.get(url, headers={"Accept": "*/*"}, timeout=10) + response.raise_for_status() + return response.text + except requests.exceptions.RequestException as e: + raise RuntimeError(f"Failed to fetch snippet file: {e}") from e diff --git a/src/tagoio_sdk/modules/Resources/Analysis_Types.py b/src/tagoio_sdk/modules/Resources/Analysis_Types.py index 3ab903d..4d32035 100644 --- a/src/tagoio_sdk/modules/Resources/Analysis_Types.py +++ b/src/tagoio_sdk/modules/Resources/Analysis_Types.py @@ -44,7 +44,11 @@ class AnalysisInfo(AnalysisCreateInfo): class AnalysisQuery(Query): - fields: Optional[List[Literal["name", "active", "run_on", "last_run", "created_at", "updated_at"]]] + fields: Optional[ + List[ + Literal["name", "active", "run_on", "last_run", "created_at", "updated_at"] + ] + ] class AnalysisListItem(TypedDict, total=False): @@ -57,3 +61,41 @@ class AnalysisListItem(TypedDict, total=False): updated_at: Optional[str] locked_at: Optional[str] console: Optional[List[str]] + + +SnippetRuntime = Literal[ + "node-legacy", "python-legacy", "node-rt2025", "python-rt2025", "deno-rt2025" +] +"""Available runtime environments for snippets""" + + +class SnippetItem(TypedDict): + """Individual snippet metadata""" + + id: str + """Unique identifier for the snippet""" + title: str + """Human-readable title""" + description: str + """Description of what the snippet does""" + language: str + """Programming language (typescript, javascript, python)""" + tags: List[str] + """Array of tags for categorization""" + filename: str + """Filename of the snippet""" + file_path: str + """Full path to the file in the runtime directory""" + + +class SnippetsListResponse(TypedDict): + """API response containing all snippets metadata for a runtime""" + + runtime: SnippetRuntime + """Runtime environment identifier""" + schema_version: int + """Schema version for the API response format""" + generated_at: str + """ISO timestamp when the response was generated""" + snippets: List[SnippetItem] + """Array of all available snippets for this runtime""" diff --git a/tests/Resources/test_analyses.py b/tests/Resources/test_analyses.py index bbd36c7..e031223 100644 --- a/tests/Resources/test_analyses.py +++ b/tests/Resources/test_analyses.py @@ -1,8 +1,13 @@ import os + from requests_mock.mocker import Mocker +from tagoio_sdk.modules.Resources.Analysis_Types import AnalysisCreateInfo +from tagoio_sdk.modules.Resources.Analysis_Types import AnalysisInfo +from tagoio_sdk.modules.Resources.Analysis_Types import AnalysisListItem +from tagoio_sdk.modules.Resources.Analysis_Types import ScriptFile from tagoio_sdk.modules.Resources.Resources import Resources -from tagoio_sdk.modules.Resources.Analysis_Types import AnalysisCreateInfo, AnalysisInfo, ScriptFile, AnalysisListItem + os.environ["T_ANALYSIS_TOKEN"] = "your_token_value" @@ -18,7 +23,7 @@ def mockAnalysisList() -> list[AnalysisListItem]: "updated_at": "2023-03-07T01:43:45.952Z", "last_run": "2023-03-07T01:43:45.952Z", } - ] + ], } @@ -31,18 +36,12 @@ def mockAnalysisInfo() -> AnalysisInfo: "created_at": "2023-03-07T01:43:45.952Z", "updated_at": "2023-03-07T01:43:45.952Z", "last_run": "2023-03-07T01:43:45.952Z", - } + }, } def mockCreateAnalysis() -> dict: - return { - "status": True, - "result": { - "id": "analysis_id", - "token": "analysis_token" - } - } + return {"status": True, "result": {"id": "analysis_id", "token": "analysis_token"}} def testAnalysesMethodList(requests_mock: Mocker) -> None: @@ -78,7 +77,10 @@ def testAnalysesMethodEdit(requests_mock: Mocker) -> None: :param requests_mock are a plugin of pytest to mock the requests. """ analysis_data = AnalysisInfo(name="Updated Analysis") - requests_mock.put("https://api.tago.io/analysis/analysis_id", json={"status": True, "result": "success"}) + requests_mock.put( + "https://api.tago.io/analysis/analysis_id", + json={"status": True, "result": "success"}, + ) resources = Resources() response = resources.analysis.edit("analysis_id", analysis_data) @@ -91,7 +93,10 @@ def testAnalysesMethodDelete(requests_mock: Mocker) -> None: """ :param requests_mock are a plugin of pytest to mock the requests. """ - requests_mock.delete("https://api.tago.io/analysis/analysis_id", json={"status": True, "result": "success"}) + requests_mock.delete( + "https://api.tago.io/analysis/analysis_id", + json={"status": True, "result": "success"}, + ) resources = Resources() response = resources.analysis.delete("analysis_id") @@ -104,7 +109,9 @@ def testAnalysesMethodInfo(requests_mock: Mocker) -> None: """ :param requests_mock are a plugin of pytest to mock the requests. """ - requests_mock.get("https://api.tago.io/analysis/analysis_id", json=mockAnalysisInfo()) + requests_mock.get( + "https://api.tago.io/analysis/analysis_id", json=mockAnalysisInfo() + ) resources = Resources() response = resources.analysis.info("analysis_id") @@ -117,7 +124,10 @@ def testAnalysesMethodRun(requests_mock: Mocker) -> None: """ :param requests_mock are a plugin of pytest to mock the requests. """ - requests_mock.post("https://api.tago.io/analysis/analysis_id/run", json={"status": True, "result": {"token": "run_token"}}) + requests_mock.post( + "https://api.tago.io/analysis/analysis_id/run", + json={"status": True, "result": {"token": "run_token"}}, + ) resources = Resources() response = resources.analysis.run("analysis_id") @@ -130,7 +140,10 @@ def testAnalysesMethodTokenGenerate(requests_mock: Mocker) -> None: """ :param requests_mock are a plugin of pytest to mock the requests. """ - requests_mock.get("https://api.tago.io/analysis/analysis_id/token", json={"status": True, "result": {"token": "new_token"}}) + requests_mock.get( + "https://api.tago.io/analysis/analysis_id/token", + json={"status": True, "result": {"token": "new_token"}}, + ) resources = Resources() response = resources.analysis.tokenGenerate("analysis_id") @@ -143,8 +156,13 @@ def testAnalysesMethodUploadScript(requests_mock: Mocker) -> None: """ :param requests_mock are a plugin of pytest to mock the requests. """ - script_file = ScriptFile(name="script.js", language="node", content="console.log('Hello, World!');") - requests_mock.post("https://api.tago.io/analysis/analysis_id/upload", json={"status": True, "result": "success"}) + script_file = ScriptFile( + name="script.js", language="node", content="console.log('Hello, World!');" + ) + requests_mock.post( + "https://api.tago.io/analysis/analysis_id/upload", + json={"status": True, "result": "success"}, + ) resources = Resources() response = resources.analysis.uploadScript("analysis_id", script_file) @@ -157,10 +175,120 @@ def testAnalysesMethodDownloadScript(requests_mock: Mocker) -> None: """ :param requests_mock are a plugin of pytest to mock the requests. """ - requests_mock.get("https://api.tago.io/analysis/analysis_id/download", json={"status": True, "result": {"url": "https://download.url"}}) + requests_mock.get( + "https://api.tago.io/analysis/analysis_id/download", + json={"status": True, "result": {"url": "https://download.url"}}, + ) resources = Resources() response = resources.analysis.downloadScript("analysis_id") assert response == {"url": "https://download.url"} assert isinstance(response, dict) + + +def testAnalysesMethodListSnippets(requests_mock: Mocker) -> None: + """ + Test listSnippets method to retrieve all available snippets for a runtime. + :param requests_mock are a plugin of pytest to mock the requests. + """ + mock_snippets_response = { + "runtime": "python-rt2025", + "schema_version": 1, + "generated_at": "2025-01-13T12:00:00Z", + "snippets": [ + { + "id": "console-example", + "title": "Console Example", + "description": "Basic console logging example", + "language": "python", + "tags": ["basics", "logging"], + "filename": "console.py", + "file_path": "python-rt2025/console.py", + }, + { + "id": "data-processing", + "title": "Data Processing", + "description": "Process device data", + "language": "python", + "tags": ["data", "processing"], + "filename": "process-data.py", + "file_path": "python-rt2025/process-data.py", + }, + ], + } + + requests_mock.get( + "https://snippets.tago.io/python-rt2025.json", json=mock_snippets_response + ) + + resources = Resources() + response = resources.analysis.listSnippets("python-rt2025") + + assert response["runtime"] == "python-rt2025" + assert response["schema_version"] == 1 + assert len(response["snippets"]) == 2 + assert response["snippets"][0]["id"] == "console-example" + assert response["snippets"][0]["title"] == "Console Example" + assert isinstance(response, dict) + assert isinstance(response["snippets"], list) + + +def testAnalysesMethodGetSnippetFile(requests_mock: Mocker) -> None: + """ + Test getSnippetFile method to retrieve raw snippet file content. + :param requests_mock are a plugin of pytest to mock the requests. + """ + mock_file_content = """# Console Example +from tagoio_sdk import Analysis + +def my_analysis(context): + context.log("Hello from Python snippet!") + +Analysis(my_analysis) +""" + + requests_mock.get( + "https://snippets.tago.io/python-rt2025/console.py", text=mock_file_content + ) + + resources = Resources() + response = resources.analysis.getSnippetFile("python-rt2025", "console.py") + + assert "Console Example" in response + assert "context.log" in response + assert isinstance(response, str) + + +def testAnalysesMethodListSnippetsError(requests_mock: Mocker) -> None: + """ + Test listSnippets method error handling when request fails. + :param requests_mock are a plugin of pytest to mock the requests. + """ + requests_mock.get("https://snippets.tago.io/invalid-runtime.json", status_code=404) + + resources = Resources() + + try: + resources.analysis.listSnippets("invalid-runtime") + raise AssertionError("Expected RuntimeError to be raised") + except RuntimeError as e: + assert "Failed to fetch snippets" in str(e) + + +def testAnalysesMethodGetSnippetFileError(requests_mock: Mocker) -> None: + """ + Test getSnippetFile method error handling when file not found. + :param requests_mock are a plugin of pytest to mock the requests. + """ + requests_mock.get( + "https://snippets.tago.io/python-rt2025/nonexistent.py", status_code=404 + ) + + resources = Resources() + + try: + resources.analysis.getSnippetFile("python-rt2025", "nonexistent.py") + raise AssertionError("Expected RuntimeError to be raised") + except RuntimeError as e: + assert "Failed to fetch snippet file" in str(e) From 95f38ed549d8b96d3a8d5d642ab18c8743a45652 Mon Sep 17 00:00:00 2001 From: Mateus Silva Date: Fri, 14 Nov 2025 11:07:04 -0300 Subject: [PATCH 2/7] feat: enhance regions support and Analysis runtime with async execution Expanded regions.py with EU region support, TDeploy project integration, and runtime region caching. Refactored Analysis class to support async/await execution patterns with improved error handling and console service integration. Removed deprecated api_socket.py infrastructure and added JSONParseSafe utility for safer JSON parsing. Updated Analysis type definitions with new constructor params and function signatures. Added comprehensive region tests covering TDeploy and multi-region scenarios. --- docs/source/conf.py | 2 +- src/tagoio_sdk/common/JSON_Parse_Safe.py | 14 + src/tagoio_sdk/infrastructure/api_socket.py | 35 --- src/tagoio_sdk/modules/Analysis/Analysis.py | 277 +++++++++++++----- .../modules/Analysis/Analysis_Type.py | 58 +++- src/tagoio_sdk/regions.py | 110 +++++-- tests/Regions/test_tdeploy.py | 92 ++++++ 7 files changed, 452 insertions(+), 136 deletions(-) create mode 100644 src/tagoio_sdk/common/JSON_Parse_Safe.py delete mode 100644 src/tagoio_sdk/infrastructure/api_socket.py create mode 100644 tests/Regions/test_tdeploy.py diff --git a/docs/source/conf.py b/docs/source/conf.py index 30148d1..e178612 100644 --- a/docs/source/conf.py +++ b/docs/source/conf.py @@ -61,4 +61,4 @@ # Add any paths that contain custom static files (such as style sheets) here, # relative to this directory. They are copied after the builtin static files, # so a file named "default.css" will overwrite the builtin "default.css". -html_static_path = ["_static"] +# html_static_path = ["_static"] diff --git a/src/tagoio_sdk/common/JSON_Parse_Safe.py b/src/tagoio_sdk/common/JSON_Parse_Safe.py new file mode 100644 index 0000000..b39eae6 --- /dev/null +++ b/src/tagoio_sdk/common/JSON_Parse_Safe.py @@ -0,0 +1,14 @@ +import json + +from typing import Any + + +def JSONParseSafe(jsonString: str, default: Any = None) -> Any: + """Safely parse JSON string with fallback to default value""" + if not jsonString: + return default + + try: + return json.loads(jsonString) + except (json.JSONDecodeError, TypeError, ValueError): + return default if default is not None else {} diff --git a/src/tagoio_sdk/infrastructure/api_socket.py b/src/tagoio_sdk/infrastructure/api_socket.py deleted file mode 100644 index dd20b55..0000000 --- a/src/tagoio_sdk/infrastructure/api_socket.py +++ /dev/null @@ -1,35 +0,0 @@ -import socketio - -from tagoio_sdk import config -from tagoio_sdk.common.tagoio_module import GenericModuleParams -from tagoio_sdk.regions import getConnectionURI - - -socketOptions = config.tagoSDKconfig["socketOpts"] - - -class APISocket: - def __init__(self, params: GenericModuleParams) -> None: - url = getConnectionURI(params.get("region"))["realtime"] - URLRealtime = "{}{}{}".format(url, "?token=", params.get("token")) - self.realtimeURL = URLRealtime - - sio = socketio.AsyncClient( - reconnection=socketOptions["reconnection"], - reconnection_delay=socketOptions["reconnectionDelay"], - ) - self.sio = sio - - async def connect(self) -> socketio.AsyncClient: - await self.sio.connect( - url=self.realtimeURL, transports=socketOptions["transports"] - ) - await self.sio.wait() - - -channels = { - "notification": "notification::data", - "analysisConsole": "analysis::console", - "analysisTrigger": "analysis::trigger", - "bucketData": "bucket::data", -} diff --git a/src/tagoio_sdk/modules/Analysis/Analysis.py b/src/tagoio_sdk/modules/Analysis/Analysis.py index a7f151c..ccdf43b 100644 --- a/src/tagoio_sdk/modules/Analysis/Analysis.py +++ b/src/tagoio_sdk/modules/Analysis/Analysis.py @@ -1,147 +1,264 @@ +import asyncio +import inspect import json import os -import signal import sys from typing import Any -from typing import Callable +from typing import List from typing import Optional +from tagoio_sdk.common.JSON_Parse_Safe import JSONParseSafe from tagoio_sdk.common.tagoio_module import TagoIOModule from tagoio_sdk.infrastructure.api_sse import openSSEListening +from tagoio_sdk.modules.Analysis.Analysis_Type import AnalysisConstructorParams from tagoio_sdk.modules.Analysis.Analysis_Type import AnalysisEnvironment -from tagoio_sdk.modules.Services import Services +from tagoio_sdk.modules.Analysis.Analysis_Type import AnalysisFunction +from tagoio_sdk.modules.Services.Console import ConsoleService +from tagoio_sdk.regions import getConnectionURI as getRegionObj +from tagoio_sdk.regions import setRuntimeRegion T_ANALYSIS_CONTEXT = os.environ.get("T_ANALYSIS_CONTEXT") or None class Analysis(TagoIOModule): - def __init__(self, params): + """ + Analysis execution context for TagoIO + + This class provides the runtime environment for executing analysis scripts in TagoIO. + It manages environment variables, console outputs, and analysis execution lifecycle. + Analyses can run locally for development or in the TagoIO cloud platform. + + Example: Basic analysis usage + ```python + from tagoio_sdk import Analysis + + def my_analysis(context, scope): + # Get analysis environment variables + environment = context.environment + + # Use console service for logging + context.log("Analysis started") + + # Your analysis logic here + print("Processing data...") + + analysis = Analysis(my_analysis, {"token": "your-analysis-token"}) + ``` + + Example: Environment variables + ```python + def my_analysis(context, scope): + env = context.environment + api_key = next((e["value"] for e in env if e["key"] == "API_KEY"), None) + + analysis = Analysis(my_analysis) + ``` + + Example: Manual start control + ```python + analysis = Analysis(my_analysis, { + "token": "token", + "autostart": False + }) + + # Start analysis manually + analysis.start() + ``` + """ + + def __init__(self, analysis: AnalysisFunction, params: Optional[AnalysisConstructorParams] = None): + if params is None: + params = {"token": "unknown"} + super().__init__(params) - self._running = True + self.analysis = analysis - def _signal_handler(self, signum, frame): - """Handle Ctrl+C gracefully""" - print("\n¬ Analysis stopped by user. Goodbye!") - self._running = False - sys.exit(0) + if params.get("autostart"): + self.start() - def init(self, analysis: Callable): - self._analysis = analysis + def start(self) -> None: + if self.started: + return - # Set up signal handler for graceful shutdown - signal.signal(signal.SIGINT, self._signal_handler) - signal.signal(signal.SIGTERM, self._signal_handler) + self.started = True - if T_ANALYSIS_CONTEXT is None: - self.__localRuntime() + if not os.environ.get("T_ANALYSIS_CONTEXT"): + self._localRuntime() else: - self.__runOnTagoIO() + self._runOnTagoIO() - def __runOnTagoIO(self): - def context(): - pass + def _runOnTagoIO(self) -> None: + if not self.analysis or not callable(self.analysis): + raise TypeError("Invalid analysis function") - context.log = print - context.token = os.environ["T_ANALYSIS_TOKEN"] - context.analysis_id = os.environ["T_ANALYSIS_ID"] - try: - context.environment = json.loads(os.environ["T_ANALYSIS_ENV"]) - except (KeyError, json.JSONDecodeError): - context.environment = [] + # Create context object + context = { + "log": print, + "token": os.environ.get("T_ANALYSIS_TOKEN", ""), + "environment": JSONParseSafe(os.environ.get("T_ANALYSIS_ENV", "[]"), []), + "analysis_id": os.environ.get("T_ANALYSIS_ID", ""), + } - try: - data = json.loads(os.environ["T_ANALYSIS_DATA"]) - except (KeyError, json.JSONDecodeError): - data = [] + data = JSONParseSafe(os.environ.get("T_ANALYSIS_DATA", "[]"), []) - self._analysis(context, data) + self.analysis(context, data) - def __runLocal( + def _stringifyMsg(self, msg: Any) -> str: + if isinstance(msg, dict) and not isinstance(msg, list): + return json.dumps(msg) + return str(msg) + + def _runLocal( self, - environment: list[AnalysisEnvironment], - data: list[Any], - analysis_id: str, + environment: List[AnalysisEnvironment], + data: List[Any], + analysisID: str, token: str, - ): - """Run Analysis @internal""" + ) -> None: + if not self.analysis or not callable(self.analysis): + raise TypeError("Invalid analysis function") + + tagoConsole = ConsoleService({"token": token, "region": self.params.get("region")}) - def log(*args: any): - print(*args) - log_message = " ".join(str(arg) for arg in args) - Services.Services({"token": token}).console.log(log_message) + def log(*args: Any) -> None: + """Log messages to console and TagoIO""" + # Only print locally if not auto-running + if not os.environ.get("T_ANALYSIS_AUTO_RUN"): + print(*args) - def context(): - pass + # Handle error objects with stack trace + processedArgs = [] + for arg in args: + if hasattr(arg, "stack"): + processedArgs.append(arg.stack) + else: + processedArgs.append(arg) - context.log = log - context.token = token - context.environment = environment - context.analysis_id = analysis_id + # Convert all arguments to strings + argsStrings = [self._stringifyMsg(arg) for arg in processedArgs] - self._analysis(context, data or []) + # Send to TagoIO console + try: + tagoConsole.log(" ".join(argsStrings)) + except Exception as e: + print(f"Console error: {e}", file=sys.stderr) - def __localRuntime(self): - analysis = self.doRequest({"path": "/info", "method": "GET"}) + context = { + "log": log, + "token": token, + "environment": environment, + "analysis_id": analysisID, + } + + # Execute analysis function + if inspect.iscoroutinefunction(self.analysis): + # Async function + try: + asyncio.run(self.analysis(context, data or [])) + except Exception as error: + log(error) + else: + # Sync function + try: + self.analysis(context, data or []) + except Exception as error: + log(error) + + def _localRuntime(self) -> None: + """Set up local runtime environment for development""" + if self.params.get("token") == "unknown": + raise ValueError("To run analysis locally, you need a token") + + try: + analysis = self.doRequest({"path": "/info", "method": "GET"}) + except Exception: + analysis = None if not analysis: - print("¬ Error :: Analysis not found or not active.") + print("¬ Error :: Analysis not found or not active.", file=sys.stderr) return if analysis.get("run_on") != "external": print("¬ Warning :: Analysis is not set to run on external") - tokenEnd = self.token[-5:] - + # Open SSE connection try: sse = openSSEListening( { - "token": self.token, - "region": self.region, + "token": self.params.get("token"), + "region": self.params.get("region"), "channel": "analysis_trigger", } ) - print( - f"\n¬ Connected to TagoIO :: Analysis [{analysis['name']}]({tokenEnd}) is ready." - ) - print("¬ Waiting for analysis trigger... (Press Ctrl+C to stop)\n") except Exception as e: - print("¬ Connection was closed, trying to reconnect...") - print(f"Error: {e}") + print(f"¬ Connection error: {e}", file=sys.stderr) return + tokenEnd = str(self.params.get("token", ""))[-5:] + + print(f"\n¬ Connected to TagoIO :: Analysis [{analysis.get('name', 'Unknown')}]({tokenEnd}) is ready.") + print("¬ Waiting for analysis trigger... (Press Ctrl+C to stop)\n") + try: for event in sse.events(): if not self._running: break try: - data = json.loads(event.data).get("payload") + parsed = JSONParseSafe(event.data, {}) + payload = parsed.get("payload") - if not data: + if not payload: continue - self.__runLocal( - data["environment"], - data["data"], - data["analysis_id"], - self.token, + self._runLocal( + payload.get("environment", []), + payload.get("data", []), + payload.get("analysis_id", ""), + payload.get("token", self.params.get("token", "")), ) - except RuntimeError: - print("¬ Connection was closed, trying to reconnect...") - pass + except Exception as e: + print(f"¬ Error processing event: {e}", file=sys.stderr) + continue + except KeyboardInterrupt: print("\n¬ Analysis stopped by user. Goodbye!") except Exception as e: - print(f"\n¬ Unexpected error: {e}") + print(f"¬ Connection was closed: {e}", file=sys.stderr) + print("¬ Trying to reconnect...") finally: self._running = False @staticmethod - def use(analysis: Callable, params: Optional[str] = {"token": "unknown"}): - if not os.environ.get("T_ANALYSIS_TOKEN"): - os.environ["T_ANALYSIS_TOKEN"] = params.get("token") - Analysis(params).init(analysis) - else: - Analysis({"token": os.environ["T_ANALYSIS_TOKEN"]}).init(analysis) + def use( + analysis: AnalysisFunction, + params: Optional[AnalysisConstructorParams] = None, + ) -> "Analysis": + """ + Create and configure Analysis instance with environment setup + + This static method provides a convenient way to create an Analysis instance + while automatically configuring environment variables and runtime region. + + Example: + ```python + def my_analysis(context, scope): + context.log("Hello from analysis!") + + analysis = Analysis.use(my_analysis, {"token": "my-token"}) + ``` + """ + if params is None: + params = {"token": "unknown"} + + if not os.environ.get("T_ANALYSIS_TOKEN") and params.get("token"): + os.environ["T_ANALYSIS_TOKEN"] = params["token"] + + # Configure runtime region + runtimeRegion = params.get("region") if getRegionObj(params["region"]) else None + if runtimeRegion: + setRuntimeRegion(runtimeRegion) + + return Analysis(analysis, params) diff --git a/src/tagoio_sdk/modules/Analysis/Analysis_Type.py b/src/tagoio_sdk/modules/Analysis/Analysis_Type.py index d7e73b2..f8bace7 100644 --- a/src/tagoio_sdk/modules/Analysis/Analysis_Type.py +++ b/src/tagoio_sdk/modules/Analysis/Analysis_Type.py @@ -1 +1,57 @@ -AnalysisEnvironment = dict[str, str] +from typing import Any +from typing import Callable +from typing import Dict +from typing import List +from typing import Optional +from typing import TypedDict +from typing import Union + +from tagoio_sdk.regions import Regions +from tagoio_sdk.regions import RegionsObj + + +AnalysisFunction = Callable[[Any, Any], Any] + + +class AnalysisConstructorParams(TypedDict, total=False): + token: Optional[str] + """Analysis token for authentication""" + region: Optional[Union[Regions, RegionsObj]] + """Region configuration for the analysis""" + autostart: Optional[bool] + """ + Auto start analysis after instance the class. + If turned off, you can start analysis by calling [AnalysisInstance].start(). + Default: True + """ + load_env_on_process: Optional[bool] + """ + Load TagoIO Analysis envs on process environment. + + Warning: It's not safe to use on external analysis. + It will load all env on process, then if the external analysis receives multiple requests + simultaneously, it can mess up. + + Default: False + """ + + +AnalysisEnvironment = Dict[str, str] + + +AnalysisToken = str + + +AnalysisID = str + + +class TagoContext(TypedDict): + """ + TagoIO Analysis Context interface. + As current version of the SDK doesn't provide the full TagoContext interface. + """ + + token: AnalysisToken + analysis_id: AnalysisID + environment: List[AnalysisEnvironment] + log: Callable[..., None] diff --git a/src/tagoio_sdk/regions.py b/src/tagoio_sdk/regions.py index 7df30f2..128d77b 100644 --- a/src/tagoio_sdk/regions.py +++ b/src/tagoio_sdk/regions.py @@ -1,55 +1,127 @@ import os -from contextlib import suppress from typing import Literal from typing import Optional from typing import TypedDict +from typing import Union -class RegionDefinition(TypedDict): +class RegionsObjApi(TypedDict): + """Region configuration with API/SSE endpoints.""" + api: str - realtime: str sse: str -# noRegionWarning = False +class RegionsObjTDeploy(TypedDict): + """Region configuration with TagoIO Deploy Project ID.""" + + tdeploy: str + + +RegionsObj = Union[RegionsObjApi, RegionsObjTDeploy] +"""Region configuration object (either API/SSE pair or TDeploy)""" + +Regions = Literal["us-e1", "eu-w1", "env"] +"""Supported TagoIO regions""" -regionsDefinition = { - "usa-1": { +# Runtime region cache +runtimeRegion: Optional[RegionsObj] = None + +# Object of Regions Definition +regionsDefinition: dict[str, Optional[RegionsObjApi]] = { + "us-e1": { "api": "https://api.tago.io", - "realtime": "wss://realtime.tago.io", "sse": "https://sse.tago.io/events", }, - "env": None, # ? process object should be on trycatch. + "eu-w1": { + "api": "https://api.eu-w1.tago.io", + "sse": "https://sse.eu-w1.tago.io/events", + }, + "env": None, # process object should be on trycatch } -Regions = Literal["usa-1", "env"] +def getConnectionURI(region: Optional[Union[Regions, RegionsObj]] = None) -> RegionsObjApi: + """ + Get connection URI for API and SSE. + + Args: + region: Region identifier or configuration object + + Returns: + Region configuration with API and SSE endpoints + + Raises: + ReferenceError: If invalid region is specified + """ + global runtimeRegion -def getConnectionURI(region: Optional[Regions]) -> RegionDefinition: - value = None - with suppress(KeyError): - value = regionsDefinition[region] + # Handle tdeploy in RegionsObj - takes precedence + if isinstance(region, dict) and "tdeploy" in region: + tdeploy = region["tdeploy"].strip() + if tdeploy: + return { + "api": f"https://api.{tdeploy}.tagoio.net", + "sse": f"https://sse.{tdeploy}.tagoio.net/events", + } + + normalized_region = region + if isinstance(normalized_region, str) and normalized_region == "usa-1": + normalized_region = "us-e1" + + value: Optional[RegionsObjApi] = None + if isinstance(normalized_region, str): + value = regionsDefinition.get(normalized_region) + elif isinstance(normalized_region, dict): + # If it's already a RegionsObj with api/sse, use it + if "api" in normalized_region and "sse" in normalized_region: + value = normalized_region if value is not None: return value + if runtimeRegion is not None: + return runtimeRegion + if region is not None and region != "env": - raise Exception(f"> TagoIO-SDK: Invalid region {region}.") + raise ReferenceError(f"> TagoIO-SDK: Invalid region {region}.") try: api = os.environ.get("TAGOIO_API") - realtime = os.environ.get("TAGOIO_REALTIME") sse = os.environ.get("TAGOIO_SSE") if not api and region != "env": raise Exception("Invalid Env") - return {"api": api, "realtime": realtime, "sse": sse} + return {"api": api or "", "sse": sse or ""} except Exception: - # global noRegionWarning - # if noRegionWarning is False: + # if not noRegionWarning: # print("> TagoIO-SDK: No region or env defined, using fallback as usa-1.") # noRegionWarning = True - return regionsDefinition["usa-1"] + return regionsDefinition["us-e1"] + + +def setRuntimeRegion(region: RegionsObj) -> None: + """ + Set region in-memory to be inherited by other modules when set in the Analysis runtime + with `Analysis.use()`. + + Example: + ```python + def my_analysis(context, scope): + # this uses the region defined through `use` + resources = Resources({"token": token}) + + # it's still possible to override if needed + europe_resources = Resources({"token": token, "region": "eu-w1"}) + + Analysis.use(my_analysis, {"region": "us-e1"}) + ``` + + Args: + region: Region configuration object + """ + global runtimeRegion + runtimeRegion = region diff --git a/tests/Regions/test_tdeploy.py b/tests/Regions/test_tdeploy.py new file mode 100644 index 0000000..166bf03 --- /dev/null +++ b/tests/Regions/test_tdeploy.py @@ -0,0 +1,92 @@ +from tagoio_sdk.regions import getConnectionURI + +"""Test suite for TagoIO Deploy (tdeploy) Region Support""" + + +def testShouldGenerateCorrectEndpointsForTdeployRegion(): + """Should generate correct endpoints for tdeploy region""" + tdeploy = "68951c0e023862b2aea00f3f" + region = {"tdeploy": tdeploy} + + result = getConnectionURI(region) + + assert result["api"] == f"https://api.{tdeploy}.tagoio.net" + assert result["sse"] == f"https://sse.{tdeploy}.tagoio.net/events" + + +def testShouldPrioritizeTdeployOverOtherFields(): + """Should prioritize tdeploy over other fields when both are provided""" + tdeploy = "68951c0e023862b2aea00f3f" + # mixing api/sse with tdeploy is no longer allowed by types; + # pass only tdeploy and ensure correct priority handling remains + region = {"tdeploy": tdeploy} + + result = getConnectionURI(region) + + assert result["api"] == f"https://api.{tdeploy}.tagoio.net" + assert result["sse"] == f"https://sse.{tdeploy}.tagoio.net/events" + + +def testShouldTrimWhitespaceFromTdeployValue(): + """Should trim whitespace from tdeploy value""" + tdeploy = " 68951c0e023862b2aea00f3f " + region = {"tdeploy": tdeploy} + + result = getConnectionURI(region) + + assert result["api"] == "https://api.68951c0e023862b2aea00f3f.tagoio.net" + assert result["sse"] == "https://sse.68951c0e023862b2aea00f3f.tagoio.net/events" + + +def testShouldFallbackToStandardBehaviorWhenTdeployIsEmpty(): + """Should fallback to standard behavior when tdeploy is empty""" + region = { + "tdeploy": "", + "api": "https://custom-api.example.com", + "sse": "https://custom-sse.example.com", + } + + # Empty tdeploy should fallback to api/sse fields + result = getConnectionURI(region) + + assert result["api"] == "https://custom-api.example.com" + assert result["sse"] == "https://custom-sse.example.com" + + +def testShouldFallbackToStandardBehaviorWhenTdeployIsWhitespaceOnly(): + """Should fallback to standard behavior when tdeploy is whitespace only""" + region = { + "tdeploy": " ", + "api": "https://custom-api.example.com", + "sse": "https://custom-sse.example.com", + } + + # Whitespace-only tdeploy should fallback to api/sse fields + result = getConnectionURI(region) + + assert result["api"] == "https://custom-api.example.com" + assert result["sse"] == "https://custom-sse.example.com" + + +def testShouldMaintainBackwardCompatibilityWithExistingRegions(): + """Should maintain backward compatibility with existing regions""" + result1 = getConnectionURI("us-e1") + assert result1["api"] == "https://api.tago.io" + assert result1["sse"] == "https://sse.tago.io/events" + + result2 = getConnectionURI("eu-w1") + assert result2["api"] == "https://api.eu-w1.tago.io" + assert result2["sse"] == "https://sse.eu-w1.tago.io/events" + + +def testShouldMaintainBackwardCompatibilityWithCustomRegionsObj(): + """Should maintain backward compatibility with custom RegionsObj""" + customRegion = { + "api": "https://my-api.com", + "sse": "https://my-sse.com", + } + + result = getConnectionURI(customRegion) + + assert result["api"] == "https://my-api.com" + assert result["sse"] == "https://my-sse.com" From 7001afa5acd325d6674fe1b2523532d9d8971eff Mon Sep 17 00:00:00 2001 From: Mateus Silva Date: Fri, 5 Dec 2025 15:22:08 -0300 Subject: [PATCH 3/7] feat: enhance Analysis class initialization and region handling - Add instance attributes (params, started, _running) for better state management - Fix autostart logic to explicitly check for False instead of falsy values - Fix region configuration to safely handle missing region parameter using .get() This improves the Analysis class initialization by adding proper state tracking and preventing potential KeyError when region is not provided. --- src/tagoio_sdk/modules/Analysis/Analysis.py | 7 +++++-- 1 file changed, 5 insertions(+), 2 deletions(-) diff --git a/src/tagoio_sdk/modules/Analysis/Analysis.py b/src/tagoio_sdk/modules/Analysis/Analysis.py index ccdf43b..d115e0a 100644 --- a/src/tagoio_sdk/modules/Analysis/Analysis.py +++ b/src/tagoio_sdk/modules/Analysis/Analysis.py @@ -73,9 +73,12 @@ def __init__(self, analysis: AnalysisFunction, params: Optional[AnalysisConstruc params = {"token": "unknown"} super().__init__(params) + self.params = params self.analysis = analysis + self.started = False + self._running = True - if params.get("autostart"): + if params.get("autostart") != False: self.start() def start(self) -> None: @@ -257,7 +260,7 @@ def my_analysis(context, scope): os.environ["T_ANALYSIS_TOKEN"] = params["token"] # Configure runtime region - runtimeRegion = params.get("region") if getRegionObj(params["region"]) else None + runtimeRegion = params.get("region") if getRegionObj(params.get("region")) else None if runtimeRegion: setRuntimeRegion(runtimeRegion) From 1a1e970fef21365b83a6d2a8a52e1e8f0b445a28 Mon Sep 17 00:00:00 2001 From: Mateus Silva Date: Fri, 5 Dec 2025 15:28:27 -0300 Subject: [PATCH 4/7] feat: update Analysis class to start automatically based on autostart parameter --- src/tagoio_sdk/modules/Analysis/Analysis.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/tagoio_sdk/modules/Analysis/Analysis.py b/src/tagoio_sdk/modules/Analysis/Analysis.py index d115e0a..99d8c42 100644 --- a/src/tagoio_sdk/modules/Analysis/Analysis.py +++ b/src/tagoio_sdk/modules/Analysis/Analysis.py @@ -78,7 +78,7 @@ def __init__(self, analysis: AnalysisFunction, params: Optional[AnalysisConstruc self.started = False self._running = True - if params.get("autostart") != False: + if params.get("autostart"): self.start() def start(self) -> None: From 35506c6d58cbc93a987915c0dffa3116d244ea60 Mon Sep 17 00:00:00 2001 From: Mateus Silva Date: Fri, 5 Dec 2025 15:59:18 -0300 Subject: [PATCH 5/7] feat: modify autostart parameter default to True in Analysis class --- src/tagoio_sdk/modules/Analysis/Analysis.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/tagoio_sdk/modules/Analysis/Analysis.py b/src/tagoio_sdk/modules/Analysis/Analysis.py index 99d8c42..934cb4f 100644 --- a/src/tagoio_sdk/modules/Analysis/Analysis.py +++ b/src/tagoio_sdk/modules/Analysis/Analysis.py @@ -78,7 +78,7 @@ def __init__(self, analysis: AnalysisFunction, params: Optional[AnalysisConstruc self.started = False self._running = True - if params.get("autostart"): + if params.get("autostart", True): self.start() def start(self) -> None: From 3cf8960e3f2718c962a3d04bbf0cf4786d805503 Mon Sep 17 00:00:00 2001 From: Mateus Silva Date: Mon, 8 Dec 2025 09:12:11 -0300 Subject: [PATCH 6/7] feat: refactor Analysis class initialization and improve region support Restructured Analysis class to separate initialization from execution flow using new init() method pattern. Modified TagoContext from TypedDict to class for better runtime flexibility. Updated documentation examples to reflect Python runtime instead of Deno. --- .vscode/settings.json | 3 +- src/tagoio_sdk/modules/Analysis/Analysis.py | 104 ++++++++++-------- .../modules/Analysis/Analysis_Type.py | 2 +- src/tagoio_sdk/modules/Resources/Analyses.py | 4 +- 4 files changed, 63 insertions(+), 50 deletions(-) diff --git a/.vscode/settings.json b/.vscode/settings.json index eb5a2c9..c847e64 100644 --- a/.vscode/settings.json +++ b/.vscode/settings.json @@ -8,6 +8,7 @@ "ignoretz", "PYPI", "pytest", - "serie_number" + "serie_number", + "Tago" ] } diff --git a/src/tagoio_sdk/modules/Analysis/Analysis.py b/src/tagoio_sdk/modules/Analysis/Analysis.py index 934cb4f..c4036ac 100644 --- a/src/tagoio_sdk/modules/Analysis/Analysis.py +++ b/src/tagoio_sdk/modules/Analysis/Analysis.py @@ -44,50 +44,69 @@ def my_analysis(context, scope): # Your analysis logic here print("Processing data...") - analysis = Analysis(my_analysis, {"token": "your-analysis-token"}) + Analysis.use(analysis=my_analysis, params={"token": "your-analysis-token"}) ``` - Example: Environment variables + Example: Analysis with EU region ```python + from tagoio_sdk import Analysis + def my_analysis(context, scope): - env = context.environment - api_key = next((e["value"] for e in env if e["key"] == "API_KEY"), None) + context.log("Running in EU region") + print("Environment:", context.environment) + + # Using Analysis.use() method + Analysis.use(analysis=my_analysis, params={"token": "your-analysis-token", "region": "eu-w1"}) + ``` - analysis = Analysis(my_analysis) + Example: Analysis with Tago Deploy + ```python + from tagoio_sdk import Analysis + + def my_analysis(context, scope): + context.log("Running in TDeploy") + print("Scope:", scope) + + # Tago Deploy requires a dictionary with tdeploy ID + Analysis.use( + analysis=my_analysis, + params={ + "token": "your-analysis-token", + "region": {"tdeploy": "your-tdeploy-id"} + } + ) ``` - Example: Manual start control + Example: Environment variables ```python - analysis = Analysis(my_analysis, { - "token": "token", - "autostart": False - }) + def my_analysis(context, scope): + env = context.environment + api_key = next((e["value"] for e in env if e["key"] == "API_KEY"), None) - # Start analysis manually - analysis.start() + Analysis.use(analysis=my_analysis, params={"token": "your-analysis-token"}) ``` """ - def __init__(self, analysis: AnalysisFunction, params: Optional[AnalysisConstructorParams] = None): + def __init__(self, params: Optional[AnalysisConstructorParams] = None): if params is None: params = {"token": "unknown"} super().__init__(params) self.params = params - self.analysis = analysis - self.started = False self._running = True - if params.get("autostart", True): - self.start() + def init(self, analysis: AnalysisFunction): + self.analysis = analysis - def start(self) -> None: - if self.started: - return + if not os.environ.get("T_ANALYSIS_TOKEN") and self.params.get("token"): + os.environ["T_ANALYSIS_TOKEN"] = self.params.get("token") - self.started = True + # Configure runtime region + runtimeRegion = getRegionObj(self.params["region"]) if self.params.get("region") else None + if runtimeRegion: + setRuntimeRegion(runtimeRegion) - if not os.environ.get("T_ANALYSIS_CONTEXT"): + if T_ANALYSIS_CONTEXT is None: self._localRuntime() else: self._runOnTagoIO() @@ -96,13 +115,13 @@ def _runOnTagoIO(self) -> None: if not self.analysis or not callable(self.analysis): raise TypeError("Invalid analysis function") - # Create context object - context = { - "log": print, - "token": os.environ.get("T_ANALYSIS_TOKEN", ""), - "environment": JSONParseSafe(os.environ.get("T_ANALYSIS_ENV", "[]"), []), - "analysis_id": os.environ.get("T_ANALYSIS_ID", ""), - } + def context(): + pass + + context.log = print + context.token = os.environ.get("T_ANALYSIS_TOKEN", "") + context.analysis_id = (JSONParseSafe(os.environ.get("T_ANALYSIS_ENV", "[]"), []),) + context.environment = (os.environ.get("T_ANALYSIS_ID", ""),) data = JSONParseSafe(os.environ.get("T_ANALYSIS_DATA", "[]"), []) @@ -148,12 +167,13 @@ def log(*args: Any) -> None: except Exception as e: print(f"Console error: {e}", file=sys.stderr) - context = { - "log": log, - "token": token, - "environment": environment, - "analysis_id": analysisID, - } + def context(): + pass + + context.log = log + context.token = token + context.environment = environment + context.analysis_id = analysisID # Execute analysis function if inspect.iscoroutinefunction(self.analysis): @@ -180,7 +200,7 @@ def _localRuntime(self) -> None: analysis = None if not analysis: - print("¬ Error :: Analysis not found or not active.", file=sys.stderr) + print("¬ Error :: Analysis not found or not active or invalid analysis token.", file=sys.stderr) return if analysis.get("run_on") != "external": @@ -220,7 +240,7 @@ def _localRuntime(self) -> None: payload.get("environment", []), payload.get("data", []), payload.get("analysis_id", ""), - payload.get("token", self.params.get("token", "")), + self.token, ) except Exception as e: print(f"¬ Error processing event: {e}", file=sys.stderr) @@ -256,12 +276,4 @@ def my_analysis(context, scope): if params is None: params = {"token": "unknown"} - if not os.environ.get("T_ANALYSIS_TOKEN") and params.get("token"): - os.environ["T_ANALYSIS_TOKEN"] = params["token"] - - # Configure runtime region - runtimeRegion = params.get("region") if getRegionObj(params.get("region")) else None - if runtimeRegion: - setRuntimeRegion(runtimeRegion) - - return Analysis(analysis, params) + return Analysis(params).init(analysis) diff --git a/src/tagoio_sdk/modules/Analysis/Analysis_Type.py b/src/tagoio_sdk/modules/Analysis/Analysis_Type.py index f8bace7..0c263f6 100644 --- a/src/tagoio_sdk/modules/Analysis/Analysis_Type.py +++ b/src/tagoio_sdk/modules/Analysis/Analysis_Type.py @@ -45,7 +45,7 @@ class AnalysisConstructorParams(TypedDict, total=False): AnalysisID = str -class TagoContext(TypedDict): +class TagoContext: """ TagoIO Analysis Context interface. As current version of the SDK doesn't provide the full TagoContext interface. diff --git a/src/tagoio_sdk/modules/Resources/Analyses.py b/src/tagoio_sdk/modules/Resources/Analyses.py index eb471df..8043c3f 100644 --- a/src/tagoio_sdk/modules/Resources/Analyses.py +++ b/src/tagoio_sdk/modules/Resources/Analyses.py @@ -343,10 +343,10 @@ def listSnippets(self, runtime: SnippetRuntime) -> SnippetsListResponse: @example: ```python resources = Resources() - deno_snippets = resources.analyses.listSnippets("deno-rt2025") + python_snippets = resources.analyses.listSnippets("python-rt2025") # Print all snippet titles - for snippet in deno_snippets["snippets"]: + for snippet in python_snippets["snippets"]: print(f"{snippet['title']}: {snippet['description']}") ``` From 539efeb971a92cd6b7b7be516bfac141888c8a6c Mon Sep 17 00:00:00 2001 From: Mateus Silva Date: Mon, 8 Dec 2025 10:16:53 -0300 Subject: [PATCH 7/7] feat: update Analysis class to correctly assign analysis_id and environment from environment variables --- src/tagoio_sdk/modules/Analysis/Analysis.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/src/tagoio_sdk/modules/Analysis/Analysis.py b/src/tagoio_sdk/modules/Analysis/Analysis.py index c4036ac..0fdcd84 100644 --- a/src/tagoio_sdk/modules/Analysis/Analysis.py +++ b/src/tagoio_sdk/modules/Analysis/Analysis.py @@ -120,8 +120,8 @@ def context(): context.log = print context.token = os.environ.get("T_ANALYSIS_TOKEN", "") - context.analysis_id = (JSONParseSafe(os.environ.get("T_ANALYSIS_ENV", "[]"), []),) - context.environment = (os.environ.get("T_ANALYSIS_ID", ""),) + context.analysis_id = os.environ.get("T_ANALYSIS_ID", "") + context.environment = JSONParseSafe(os.environ.get("T_ANALYSIS_ENV", "[]"), []) data = JSONParseSafe(os.environ.get("T_ANALYSIS_DATA", "[]"), [])