diff --git a/.vscode/settings.json b/.vscode/settings.json
index eb5a2c9..c847e64 100644
--- a/.vscode/settings.json
+++ b/.vscode/settings.json
@@ -8,6 +8,7 @@
"ignoretz",
"PYPI",
"pytest",
- "serie_number"
+ "serie_number",
+ "Tago"
]
}
diff --git a/docs/source/Resources/Analysis/Analysis_Type.rst b/docs/source/Resources/Analysis/Analysis_Type.rst
index d7b7e83..e0fa082 100644
--- a/docs/source/Resources/Analysis/Analysis_Type.rst
+++ b/docs/source/Resources/Analysis/Analysis_Type.rst
@@ -102,3 +102,68 @@ AnalysisListItem
| locked_at: Optional[datetime]
| console: Optional[List[str]]
+
+
+.. _SnippetRuntime:
+
+SnippetRuntime
+--------------
+
+ Available runtime environments for snippets.
+
+ **Type:**
+
+ | Literal["node-legacy", "python-legacy", "node-rt2025", "python-rt2025", "deno-rt2025"]
+
+
+.. _SnippetItem:
+
+SnippetItem
+-----------
+
+ Individual snippet metadata.
+
+ **Attributes:**
+
+ | id: str
+ | Unique identifier for the snippet
+
+ | title: str
+ | Human-readable title
+
+ | description: str
+ | Description of what the snippet does
+
+ | language: str
+ | Programming language (typescript, javascript, python)
+
+ | tags: List[str]
+ | Array of tags for categorization
+
+ | filename: str
+ | Filename of the snippet
+
+ | file_path: str
+ | Full path to the file in the runtime directory
+
+
+.. _SnippetsListResponse:
+
+SnippetsListResponse
+--------------------
+
+ API response containing all snippets metadata for a runtime.
+
+ **Attributes:**
+
+ | runtime: :ref:`SnippetRuntime`
+ | Runtime environment identifier
+
+ | schema_version: int
+ | Schema version for the API response format
+
+ | generated_at: str
+ | ISO timestamp when the response was generated
+
+ | snippets: List[:ref:`SnippetItem`]
+ | Array of all available snippets for this runtime
diff --git a/docs/source/Resources/Analysis/index.rst b/docs/source/Resources/Analysis/index.rst
index 7086b44..46a95ca 100644
--- a/docs/source/Resources/Analysis/index.rst
+++ b/docs/source/Resources/Analysis/index.rst
@@ -1,13 +1,16 @@
**Analysis**
============
-Manage analysis in account.
+Manage analysis in your application.
=======
list
=======
-Retrieves a list with all analyses from the account
+Lists all analyses from the application with pagination support.
+Use this to retrieve and manage analyses in your application.
+
+See: `Analysis `_
**Parameters:**
@@ -17,238 +20,343 @@ Retrieves a list with all analyses from the account
.. code-block::
:caption: **Default queryObj:**
- queryObj: {
+ queryObj = {
"page": 1,
"fields": ["id", "name"],
"filter": {},
"amount": 20,
- "orderBy": ["name","asc"],
+ "orderBy": ["name", "asc"]
}
**Returns:**
| list[:ref:`AnalysisListItem`]
- .. code-block::
- :caption: **Example:**
+ .. code-block:: python
- from tagoio_sdk import Resources
+ # If receive an error "Authorization Denied", check policy "Analysis" / "Access" in Access Management.
+ from tagoio_sdk import Resources
- resources = Resources()
- resources.analysis.list()
+ resources = Resources()
+ list_result = resources.analyses.list({
+ "page": 1,
+ "fields": ["id", "name"],
+ "amount": 10,
+ "orderBy": ["name", "asc"]
+ })
+ print(list_result) # [{'id': 'analysis-id-123', 'name': 'Analysis Test', ...}]
=======
create
=======
-Create a new analysis
+Creates a new analysis in your application.
+
+See: `Creating Analysis `_
**Parameters:**
- | **analysisInfo**: :ref:`AnalysisCreateInfo`
- | Analysis information
+ | **analysisObj**: :ref:`AnalysisCreateInfo`
+ | Data object to create new TagoIO Analysis
**Returns:**
| Dict[str, GenericID | GenericToken]
- .. code-block::
- :caption: **Example:**
+ .. code-block:: python
- from tagoio_sdk import Resources
+ # If receive an error "Authorization Denied", check policy "Analysis" / "Create" in Access Management.
+ from tagoio_sdk import Resources
- resources = Resources()
- resources.analysis.create({
- "name": "My Analysis",
- "runtime": "python",
- "active": True,
- })
+ resources = Resources()
+ new_analysis = resources.analyses.create({
+ "name": "My Analysis",
+ "runtime": "python",
+ "tags": [{"key": "type", "value": "data-processing"}]
+ })
+ print(new_analysis["id"], new_analysis["token"]) # analysis-id-123, analysis-token-123
=======
edit
=======
-Modify any property of the analyze
+Modifies an existing analysis.
+
+See: `Analysis `_
**Parameters:**
- | **analysisID**: GenericID: str
- | Analysis ID
+ | **analysisID**: str
+ | Analysis identification
- | **analysisInfo**: :ref:`AnalysisCreateInfo`
- | Analysis information
+ | **analysisObj**: :ref:`AnalysisInfo`
+ | Analysis object with data to replace
**Returns:**
| string
- .. code-block::
- :caption: **Example:**
+ .. code-block:: python
- from tagoio_sdk import Resources
+ # If receive an error "Authorization Denied", check policy "Analysis" / "Create" in Access Management.
+ from tagoio_sdk import Resources
- resources = Resources()
- resources.analysis.edit("analysisID", { "name": "My Analysis Edited" })
+ resources = Resources()
+ result = resources.analyses.edit("analysis-id-123", {
+ "name": "Updated Analysis",
+ "active": False
+ })
+ print(result) # Successfully Updated
=======
delete
=======
-Deletes an analysis from the account
+Deletes an analysis from your application.
+
+See: `Analysis `_
**Parameters:**
- | **analysisID**: GenericID: str
- | Analysis ID
+ | **analysisID**: str
+ | Analysis identification
**Returns:**
| string
- .. code-block::
- :caption: **Example:**
+ .. code-block:: python
- from tagoio_sdk import Resources
+ # If receive an error "Authorization Denied", check policy "Analysis" / "Delete" in Access Management.
+ from tagoio_sdk import Resources
- resources = Resources()
- resources.analysis.delete("analysisID")
+ resources = Resources()
+ result = resources.analyses.delete("analysis-id-123")
+ print(result) # Successfully Removed
=======
info
=======
-Gets information about an analysis
+Retrieves detailed information about a specific analysis.
+
+See: `Analysis `_
**Parameters:**
- | **analysisID**: GenericID: str
- | Analysis ID
+ | **analysisID**: str
+ | Analysis identification
**Returns:**
| :ref:`AnalysisInfo`
- .. code-block::
- :caption: **Example:**
+ .. code-block:: python
- from tagoio_sdk import Resources
+ # If receive an error "Authorization Denied", check policy "Analysis" / "Access" in Access Management.
+ from tagoio_sdk import Resources
- resources = Resources()
- resources.analysis.info("analysisID")
+ resources = Resources()
+ analysis_info = resources.analyses.info("analysis-id-123")
+ print(analysis_info) # {'id': 'analysis-id-123', 'name': 'My Analysis', ...}
=======
run
=======
-Run an analysis
+Executes an analysis with optional scope parameters.
+
+See: `Analysis `_
**Parameters:**
- | **analysisID**: GenericID: str
- | Analysis ID
+ | **analysisID**: str
+ | Analysis identification
+
+ | *Optional* **scopeObj**: Dict[str, Any]
+ | Simulate scope for analysis
**Returns:**
| Dict[str, GenericToken]
- .. code-block::
- :caption: **Example:**
+ .. code-block:: python
- from tagoio_sdk import Resources
+ # If receive an error "Authorization Denied", check policy "Analysis" / "Run Analysis" in Access Management.
+ from tagoio_sdk import Resources
- resources = Resources()
- resources.analysis.run("analysisID")
+ resources = Resources()
+ result = resources.analyses.run("analysis-id-123", {"environment": "production"})
+ print(result["analysis_token"]) # analysis-token-123
=============
tokenGenerate
=============
-Generate a new token for the analysis
+Generates a new token for the analysis.
+This is only allowed when the analysis is running in external mode.
+
+See: `Analysis `_
**Parameters:**
- | **analysisID**: GenericID: str
- | Analysis ID
+ | **analysisID**: str
+ | Analysis identification
**Returns:**
| Dict[str, str]
- .. code-block::
- :caption: **Example:**
+ .. code-block:: python
- from tagoio_sdk import Resources
+ from tagoio_sdk import Resources
- resources = Resources()
- resources.analysis.tokenGenerate("analysisID")
+ resources = Resources()
+ token = resources.analyses.tokenGenerate("analysis-id-123")
+ print(token["analysis_token"]) # analysis-token-123
============
uploadScript
============
-Upload a file (base64) to Analysis. Automatically erase the old one
+Uploads a script file to an analysis.
+The file content must be base64-encoded. This automatically replaces the old script.
+
+See: `Analysis `_
**Parameters:**
- | **analysisID**: GenericID: str
- | Analysis ID
+ | **analysisID**: str
+ | Analysis identification
- | **file**: :ref:`ScriptFile`
- | File information
+ | **fileObj**: :ref:`ScriptFile`
+ | Object with name, language and content (base64) of the file
**Returns:**
| string
- .. code-block::
- :caption: **Example:**
+ .. code-block:: python
- from tagoio_sdk import Resources
- import base64
+ # If receive an error "Authorization Denied", check policy "Analysis" / "Upload Analysis Script" in Access Management.
+ from tagoio_sdk import Resources
- data = "print(Hello, World!)"
- encoded_bytes = base64.b64encode(data.encode('utf-8')).decode('utf-8')
-
- resources = Resources()
- resources.analysis.uploadScript("analysisID", {
- "name": "My Script",
- "content": encoded_bytes,
- "language": "python",
- })
+ resources = Resources()
+ result = resources.analyses.uploadScript("analysis-id-123", {
+ "name": "script.py",
+ "content": "base64-encoded-content",
+ "language": "python"
+ })
+ print(result) # Successfully Uploaded
==============
downloadScript
==============
-Get a url to download the analysis. If `version` is specified in `options`, downloads a specific version.
+Gets a download URL for the analysis script.
+If version is specified in options, downloads a specific version.
+
+See: `Analysis `_
**Parameters:**
- | **analysisID**: GenericID: str
- | Analysis ID
+ | **analysisID**: str
+ | Analysis identification
- | *Optional* **options**: Dict["version", int]
- | Options
+ | *Optional* **options**: Dict[Literal["version"], int]
+ | Options for the Analysis script to download (e.g., {"version": 1})
**Returns:**
- | Dict[str, Any]
+ | Dict
+
+ .. code-block:: python
+
+ # If receive an error "Authorization Denied", check policy "Analysis" / "Download Analysis Script" in Access Management.
+ from tagoio_sdk import Resources
+
+ resources = Resources()
+ download = resources.analyses.downloadScript("analysis-id-123", {"version": 1})
+ print(download["url"]) # https://...
+ print(download["expire_at"]) # 2025-01-13T...
+
+
+============
+listSnippets
+============
+
+Get all available snippets for a specific runtime environment.
+Fetches analysis code snippets from the public TagoIO snippets repository.
+
+See: `Script Examples `_
+
+See: `Script Editor `_
+
+ **Parameters:**
+
+ | **runtime**: :ref:`SnippetRuntime`
+ | The runtime environment to get snippets for
+
+ **Returns:**
+
+ | :ref:`SnippetsListResponse`
+
+ .. code-block:: python
+
+ from tagoio_sdk import Resources
+
+ resources = Resources()
+ deno_snippets = resources.analyses.listSnippets("deno-rt2025")
+
+ # Print all snippet titles
+ for snippet in deno_snippets["snippets"]:
+ print(f"{snippet['title']}: {snippet['description']}")
+
+
+==============
+getSnippetFile
+==============
+
+Get the raw source code content of a specific snippet file.
+Fetches the actual code content from the TagoIO snippets repository.
+
+See: `Script Examples `_
+
+See: `Script Editor `_
+
+ **Parameters:**
+
+ | **runtime**: :ref:`SnippetRuntime`
+ | The runtime environment the snippet belongs to
+
+ | **filename**: str
+ | The filename of the snippet to retrieve
+
+ **Returns:**
+
+ | str
+
+ .. code-block:: python
+
+ from tagoio_sdk import Resources
- .. code-block::
- :caption: **Example:**
+ resources = Resources()
- from tagoio_sdk import Resources
+ # Get TypeScript code for console example
+ code = resources.analyses.getSnippetFile("deno-rt2025", "console.ts")
+ print(code)
- resources = Resources()
- resources.analysis.downloadScript("analysisID")
+ # Get Python code for data processing
+ python_code = resources.analyses.getSnippetFile("python-rt2025", "avg-min-max.py")
+ print(python_code)
.. toctree::
diff --git a/docs/source/conf.py b/docs/source/conf.py
index 30148d1..e178612 100644
--- a/docs/source/conf.py
+++ b/docs/source/conf.py
@@ -61,4 +61,4 @@
# Add any paths that contain custom static files (such as style sheets) here,
# relative to this directory. They are copied after the builtin static files,
# so a file named "default.css" will overwrite the builtin "default.css".
-html_static_path = ["_static"]
+# html_static_path = ["_static"]
diff --git a/src/tagoio_sdk/common/JSON_Parse_Safe.py b/src/tagoio_sdk/common/JSON_Parse_Safe.py
new file mode 100644
index 0000000..b39eae6
--- /dev/null
+++ b/src/tagoio_sdk/common/JSON_Parse_Safe.py
@@ -0,0 +1,14 @@
+import json
+
+from typing import Any
+
+
+def JSONParseSafe(jsonString: str, default: Any = None) -> Any:
+ """Safely parse JSON string with fallback to default value"""
+ if not jsonString:
+ return default
+
+ try:
+ return json.loads(jsonString)
+ except (json.JSONDecodeError, TypeError, ValueError):
+ return default if default is not None else {}
diff --git a/src/tagoio_sdk/infrastructure/api_socket.py b/src/tagoio_sdk/infrastructure/api_socket.py
deleted file mode 100644
index dd20b55..0000000
--- a/src/tagoio_sdk/infrastructure/api_socket.py
+++ /dev/null
@@ -1,35 +0,0 @@
-import socketio
-
-from tagoio_sdk import config
-from tagoio_sdk.common.tagoio_module import GenericModuleParams
-from tagoio_sdk.regions import getConnectionURI
-
-
-socketOptions = config.tagoSDKconfig["socketOpts"]
-
-
-class APISocket:
- def __init__(self, params: GenericModuleParams) -> None:
- url = getConnectionURI(params.get("region"))["realtime"]
- URLRealtime = "{}{}{}".format(url, "?token=", params.get("token"))
- self.realtimeURL = URLRealtime
-
- sio = socketio.AsyncClient(
- reconnection=socketOptions["reconnection"],
- reconnection_delay=socketOptions["reconnectionDelay"],
- )
- self.sio = sio
-
- async def connect(self) -> socketio.AsyncClient:
- await self.sio.connect(
- url=self.realtimeURL, transports=socketOptions["transports"]
- )
- await self.sio.wait()
-
-
-channels = {
- "notification": "notification::data",
- "analysisConsole": "analysis::console",
- "analysisTrigger": "analysis::trigger",
- "bucketData": "bucket::data",
-}
diff --git a/src/tagoio_sdk/modules/Analysis/Analysis.py b/src/tagoio_sdk/modules/Analysis/Analysis.py
index a7f151c..0fdcd84 100644
--- a/src/tagoio_sdk/modules/Analysis/Analysis.py
+++ b/src/tagoio_sdk/modules/Analysis/Analysis.py
@@ -1,76 +1,171 @@
+import asyncio
+import inspect
import json
import os
-import signal
import sys
from typing import Any
-from typing import Callable
+from typing import List
from typing import Optional
+from tagoio_sdk.common.JSON_Parse_Safe import JSONParseSafe
from tagoio_sdk.common.tagoio_module import TagoIOModule
from tagoio_sdk.infrastructure.api_sse import openSSEListening
+from tagoio_sdk.modules.Analysis.Analysis_Type import AnalysisConstructorParams
from tagoio_sdk.modules.Analysis.Analysis_Type import AnalysisEnvironment
-from tagoio_sdk.modules.Services import Services
+from tagoio_sdk.modules.Analysis.Analysis_Type import AnalysisFunction
+from tagoio_sdk.modules.Services.Console import ConsoleService
+from tagoio_sdk.regions import getConnectionURI as getRegionObj
+from tagoio_sdk.regions import setRuntimeRegion
T_ANALYSIS_CONTEXT = os.environ.get("T_ANALYSIS_CONTEXT") or None
class Analysis(TagoIOModule):
- def __init__(self, params):
+ """
+ Analysis execution context for TagoIO
+
+ This class provides the runtime environment for executing analysis scripts in TagoIO.
+ It manages environment variables, console outputs, and analysis execution lifecycle.
+ Analyses can run locally for development or in the TagoIO cloud platform.
+
+ Example: Basic analysis usage
+ ```python
+ from tagoio_sdk import Analysis
+
+ def my_analysis(context, scope):
+ # Get analysis environment variables
+ environment = context.environment
+
+ # Use console service for logging
+ context.log("Analysis started")
+
+ # Your analysis logic here
+ print("Processing data...")
+
+ Analysis.use(analysis=my_analysis, params={"token": "your-analysis-token"})
+ ```
+
+ Example: Analysis with EU region
+ ```python
+ from tagoio_sdk import Analysis
+
+ def my_analysis(context, scope):
+ context.log("Running in EU region")
+ print("Environment:", context.environment)
+
+ # Using Analysis.use() method
+ Analysis.use(analysis=my_analysis, params={"token": "your-analysis-token", "region": "eu-w1"})
+ ```
+
+ Example: Analysis with Tago Deploy
+ ```python
+ from tagoio_sdk import Analysis
+
+ def my_analysis(context, scope):
+ context.log("Running in TDeploy")
+ print("Scope:", scope)
+
+ # Tago Deploy requires a dictionary with tdeploy ID
+ Analysis.use(
+ analysis=my_analysis,
+ params={
+ "token": "your-analysis-token",
+ "region": {"tdeploy": "your-tdeploy-id"}
+ }
+ )
+ ```
+
+ Example: Environment variables
+ ```python
+ def my_analysis(context, scope):
+ env = context.environment
+ api_key = next((e["value"] for e in env if e["key"] == "API_KEY"), None)
+
+ Analysis.use(analysis=my_analysis, params={"token": "your-analysis-token"})
+ ```
+ """
+
+ def __init__(self, params: Optional[AnalysisConstructorParams] = None):
+ if params is None:
+ params = {"token": "unknown"}
+
super().__init__(params)
+ self.params = params
self._running = True
- def _signal_handler(self, signum, frame):
- """Handle Ctrl+C gracefully"""
- print("\n¬ Analysis stopped by user. Goodbye!")
- self._running = False
- sys.exit(0)
+ def init(self, analysis: AnalysisFunction):
+ self.analysis = analysis
- def init(self, analysis: Callable):
- self._analysis = analysis
+ if not os.environ.get("T_ANALYSIS_TOKEN") and self.params.get("token"):
+ os.environ["T_ANALYSIS_TOKEN"] = self.params.get("token")
- # Set up signal handler for graceful shutdown
- signal.signal(signal.SIGINT, self._signal_handler)
- signal.signal(signal.SIGTERM, self._signal_handler)
+ # Configure runtime region
+ runtimeRegion = getRegionObj(self.params["region"]) if self.params.get("region") else None
+ if runtimeRegion:
+ setRuntimeRegion(runtimeRegion)
if T_ANALYSIS_CONTEXT is None:
- self.__localRuntime()
+ self._localRuntime()
else:
- self.__runOnTagoIO()
+ self._runOnTagoIO()
+
+ def _runOnTagoIO(self) -> None:
+ if not self.analysis or not callable(self.analysis):
+ raise TypeError("Invalid analysis function")
- def __runOnTagoIO(self):
def context():
pass
context.log = print
- context.token = os.environ["T_ANALYSIS_TOKEN"]
- context.analysis_id = os.environ["T_ANALYSIS_ID"]
- try:
- context.environment = json.loads(os.environ["T_ANALYSIS_ENV"])
- except (KeyError, json.JSONDecodeError):
- context.environment = []
+ context.token = os.environ.get("T_ANALYSIS_TOKEN", "")
+ context.analysis_id = os.environ.get("T_ANALYSIS_ID", "")
+ context.environment = JSONParseSafe(os.environ.get("T_ANALYSIS_ENV", "[]"), [])
- try:
- data = json.loads(os.environ["T_ANALYSIS_DATA"])
- except (KeyError, json.JSONDecodeError):
- data = []
+ data = JSONParseSafe(os.environ.get("T_ANALYSIS_DATA", "[]"), [])
- self._analysis(context, data)
+ self.analysis(context, data)
- def __runLocal(
+ def _stringifyMsg(self, msg: Any) -> str:
+ if isinstance(msg, dict) and not isinstance(msg, list):
+ return json.dumps(msg)
+ return str(msg)
+
+ def _runLocal(
self,
- environment: list[AnalysisEnvironment],
- data: list[Any],
- analysis_id: str,
+ environment: List[AnalysisEnvironment],
+ data: List[Any],
+ analysisID: str,
token: str,
- ):
- """Run Analysis @internal"""
+ ) -> None:
+ if not self.analysis or not callable(self.analysis):
+ raise TypeError("Invalid analysis function")
+
+ tagoConsole = ConsoleService({"token": token, "region": self.params.get("region")})
+
+ def log(*args: Any) -> None:
+ """Log messages to console and TagoIO"""
+ # Only print locally if not auto-running
+ if not os.environ.get("T_ANALYSIS_AUTO_RUN"):
+ print(*args)
+
+ # Handle error objects with stack trace
+ processedArgs = []
+ for arg in args:
+ if hasattr(arg, "stack"):
+ processedArgs.append(arg.stack)
+ else:
+ processedArgs.append(arg)
+
+ # Convert all arguments to strings
+ argsStrings = [self._stringifyMsg(arg) for arg in processedArgs]
- def log(*args: any):
- print(*args)
- log_message = " ".join(str(arg) for arg in args)
- Services.Services({"token": token}).console.log(log_message)
+ # Send to TagoIO console
+ try:
+ tagoConsole.log(" ".join(argsStrings))
+ except Exception as e:
+ print(f"Console error: {e}", file=sys.stderr)
def context():
pass
@@ -78,70 +173,107 @@ def context():
context.log = log
context.token = token
context.environment = environment
- context.analysis_id = analysis_id
+ context.analysis_id = analysisID
- self._analysis(context, data or [])
+ # Execute analysis function
+ if inspect.iscoroutinefunction(self.analysis):
+ # Async function
+ try:
+ asyncio.run(self.analysis(context, data or []))
+ except Exception as error:
+ log(error)
+ else:
+ # Sync function
+ try:
+ self.analysis(context, data or [])
+ except Exception as error:
+ log(error)
+
+ def _localRuntime(self) -> None:
+ """Set up local runtime environment for development"""
+ if self.params.get("token") == "unknown":
+ raise ValueError("To run analysis locally, you need a token")
- def __localRuntime(self):
- analysis = self.doRequest({"path": "/info", "method": "GET"})
+ try:
+ analysis = self.doRequest({"path": "/info", "method": "GET"})
+ except Exception:
+ analysis = None
if not analysis:
- print("¬ Error :: Analysis not found or not active.")
+ print("¬ Error :: Analysis not found or not active or invalid analysis token.", file=sys.stderr)
return
if analysis.get("run_on") != "external":
print("¬ Warning :: Analysis is not set to run on external")
- tokenEnd = self.token[-5:]
-
+ # Open SSE connection
try:
sse = openSSEListening(
{
- "token": self.token,
- "region": self.region,
+ "token": self.params.get("token"),
+ "region": self.params.get("region"),
"channel": "analysis_trigger",
}
)
- print(
- f"\n¬ Connected to TagoIO :: Analysis [{analysis['name']}]({tokenEnd}) is ready."
- )
- print("¬ Waiting for analysis trigger... (Press Ctrl+C to stop)\n")
except Exception as e:
- print("¬ Connection was closed, trying to reconnect...")
- print(f"Error: {e}")
+ print(f"¬ Connection error: {e}", file=sys.stderr)
return
+ tokenEnd = str(self.params.get("token", ""))[-5:]
+
+ print(f"\n¬ Connected to TagoIO :: Analysis [{analysis.get('name', 'Unknown')}]({tokenEnd}) is ready.")
+ print("¬ Waiting for analysis trigger... (Press Ctrl+C to stop)\n")
+
try:
for event in sse.events():
if not self._running:
break
try:
- data = json.loads(event.data).get("payload")
+ parsed = JSONParseSafe(event.data, {})
+ payload = parsed.get("payload")
- if not data:
+ if not payload:
continue
- self.__runLocal(
- data["environment"],
- data["data"],
- data["analysis_id"],
+ self._runLocal(
+ payload.get("environment", []),
+ payload.get("data", []),
+ payload.get("analysis_id", ""),
self.token,
)
- except RuntimeError:
- print("¬ Connection was closed, trying to reconnect...")
- pass
+ except Exception as e:
+ print(f"¬ Error processing event: {e}", file=sys.stderr)
+ continue
+
except KeyboardInterrupt:
print("\n¬ Analysis stopped by user. Goodbye!")
except Exception as e:
- print(f"\n¬ Unexpected error: {e}")
+ print(f"¬ Connection was closed: {e}", file=sys.stderr)
+ print("¬ Trying to reconnect...")
finally:
self._running = False
@staticmethod
- def use(analysis: Callable, params: Optional[str] = {"token": "unknown"}):
- if not os.environ.get("T_ANALYSIS_TOKEN"):
- os.environ["T_ANALYSIS_TOKEN"] = params.get("token")
- Analysis(params).init(analysis)
- else:
- Analysis({"token": os.environ["T_ANALYSIS_TOKEN"]}).init(analysis)
+ def use(
+ analysis: AnalysisFunction,
+ params: Optional[AnalysisConstructorParams] = None,
+ ) -> "Analysis":
+ """
+ Create and configure Analysis instance with environment setup
+
+ This static method provides a convenient way to create an Analysis instance
+ while automatically configuring environment variables and runtime region.
+
+ Example:
+ ```python
+ def my_analysis(context, scope):
+ context.log("Hello from analysis!")
+
+ analysis = Analysis.use(my_analysis, {"token": "my-token"})
+ ```
+ """
+ if params is None:
+ params = {"token": "unknown"}
+
+ return Analysis(params).init(analysis)
diff --git a/src/tagoio_sdk/modules/Analysis/Analysis_Type.py b/src/tagoio_sdk/modules/Analysis/Analysis_Type.py
index d7e73b2..0c263f6 100644
--- a/src/tagoio_sdk/modules/Analysis/Analysis_Type.py
+++ b/src/tagoio_sdk/modules/Analysis/Analysis_Type.py
@@ -1 +1,57 @@
-AnalysisEnvironment = dict[str, str]
+from typing import Any
+from typing import Callable
+from typing import Dict
+from typing import List
+from typing import Optional
+from typing import TypedDict
+from typing import Union
+
+from tagoio_sdk.regions import Regions
+from tagoio_sdk.regions import RegionsObj
+
+
+AnalysisFunction = Callable[[Any, Any], Any]
+
+
+class AnalysisConstructorParams(TypedDict, total=False):
+ token: Optional[str]
+ """Analysis token for authentication"""
+ region: Optional[Union[Regions, RegionsObj]]
+ """Region configuration for the analysis"""
+ autostart: Optional[bool]
+ """
+ Auto start analysis after instance the class.
+ If turned off, you can start analysis by calling [AnalysisInstance].start().
+ Default: True
+ """
+ load_env_on_process: Optional[bool]
+ """
+ Load TagoIO Analysis envs on process environment.
+
+ Warning: It's not safe to use on external analysis.
+ It will load all env on process, then if the external analysis receives multiple requests
+ simultaneously, it can mess up.
+
+ Default: False
+ """
+
+
+AnalysisEnvironment = Dict[str, str]
+
+
+AnalysisToken = str
+
+
+AnalysisID = str
+
+
+class TagoContext:
+ """
+ TagoIO Analysis Context interface.
+ As current version of the SDK doesn't provide the full TagoContext interface.
+ """
+
+ token: AnalysisToken
+ analysis_id: AnalysisID
+ environment: List[AnalysisEnvironment]
+ log: Callable[..., None]
diff --git a/src/tagoio_sdk/modules/Resources/Analyses.py b/src/tagoio_sdk/modules/Resources/Analyses.py
index af19462..8043c3f 100644
--- a/src/tagoio_sdk/modules/Resources/Analyses.py
+++ b/src/tagoio_sdk/modules/Resources/Analyses.py
@@ -4,6 +4,8 @@
from typing import Literal
from typing import Optional
+import requests
+
from tagoio_sdk.common.Common_Type import GenericID
from tagoio_sdk.common.Common_Type import GenericToken
from tagoio_sdk.common.tagoio_module import TagoIOModule
@@ -12,26 +14,42 @@
from tagoio_sdk.modules.Resources.Analysis_Types import AnalysisListItem
from tagoio_sdk.modules.Resources.Analysis_Types import AnalysisQuery
from tagoio_sdk.modules.Resources.Analysis_Types import ScriptFile
+from tagoio_sdk.modules.Resources.Analysis_Types import SnippetRuntime
+from tagoio_sdk.modules.Resources.Analysis_Types import SnippetsListResponse
from tagoio_sdk.modules.Utils.dateParser import dateParser
from tagoio_sdk.modules.Utils.dateParser import dateParserList
+# Base URL for TagoIO analysis snippets repository
+SNIPPETS_BASE_URL = "https://snippets.tago.io"
+
+
class Analyses(TagoIOModule):
def list(self, queryObj: Optional[AnalysisQuery] = None) -> List[AnalysisListItem]:
"""
- Retrieves a list with all analyses from the account
+ @description:
+ Lists all analyses from the application with pagination support.
+ Use this to retrieve and manage analyses in your application.
- :default:
+ @see:
+ https://docs.tago.io/docs/tagoio/analysis/ Analysis
- queryObj: {
+ @example:
+ If receive an error "Authorization Denied", check policy **Analysis** / **Access** in Access Management.
+ ```python
+ resources = Resources()
+ list_result = resources.analyses.list({
"page": 1,
"fields": ["id", "name"],
- "filter": {},
- "amount": 20,
- "orderBy": ["name", "asc"],
- }
+ "amount": 10,
+ "orderBy": ["name", "asc"]
+ })
+ print(list_result) # [{'id': 'analysis-id-123', 'name': 'Analysis Test', ...}]
+ ```
- :param AnalysisQuery queryObj: Search query params
+ :param AnalysisQuery queryObj: Search query params (optional)
+ :return: List of analysis items matching the query
+ :rtype: List[AnalysisListItem]
"""
queryObj = queryObj or {}
orderBy = f"{queryObj.get('orderBy', ['name', 'asc'])[0]},{queryObj.get('orderBy', ['name', 'asc'])[1]}"
@@ -55,9 +73,27 @@ def list(self, queryObj: Optional[AnalysisQuery] = None) -> List[AnalysisListIte
def create(self, analysisObj: AnalysisCreateInfo) -> Dict[str, GenericID | GenericToken]:
"""
- Create a new analyze
+ @description:
+ Creates a new analysis in your application.
+
+ @see:
+ https://help.tago.io/portal/en/kb/articles/120-creating-analysis Creating Analysis
+
+ @example:
+ If receive an error "Authorization Denied", check policy **Analysis** / **Create** in Access Management.
+ ```python
+ resources = Resources()
+ new_analysis = resources.analyses.create({
+ "name": "My Analysis",
+ "runtime": "python",
+ "tags": [{"key": "type", "value": "data-processing"}]
+ })
+ print(new_analysis["id"], new_analysis["token"]) # analysis-id-123, analysis-token-123
+ ```
- :param AnalysisCreateInfo analysisObj: Data object to create new TagoIO Analyze
+ :param AnalysisCreateInfo analysisObj: Data object to create new TagoIO Analysis
+ :return: Dictionary with the new analysis ID and token
+ :rtype: Dict[str, GenericID | GenericToken]
"""
result = self.doRequest(
{
@@ -70,10 +106,27 @@ def create(self, analysisObj: AnalysisCreateInfo) -> Dict[str, GenericID | Gener
def edit(self, analysisID: GenericID, analysisObj: AnalysisInfo) -> str:
"""
- Modify any property of the analyze
+ @description:
+ Modifies an existing analysis.
- :param GenericID analysisID: Analyze identification
- :param Partial[AnalysisInfo] analysisObj: Analyze Object with data to replace
+ @see:
+ https://docs.tago.io/docs/tagoio/analysis/ Analysis
+
+ @example:
+ If receive an error "Authorization Denied", check policy **Analysis** / **Create** in Access Management.
+ ```python
+ resources = Resources()
+ result = resources.analyses.edit("analysis-id-123", {
+ "name": "Updated Analysis",
+ "active": False
+ })
+ print(result) # Successfully Updated
+ ```
+
+ :param GenericID analysisID: Analysis identification
+ :param AnalysisInfo analysisObj: Analysis object with data to replace
+ :return: Success message
+ :rtype: str
"""
result = self.doRequest(
{
@@ -86,9 +139,23 @@ def edit(self, analysisID: GenericID, analysisObj: AnalysisInfo) -> str:
def delete(self, analysisID: GenericID) -> str:
"""
- Deletes an analyze from the account
+ @description:
+ Deletes an analysis from your application.
+
+ @see:
+ https://docs.tago.io/docs/tagoio/analysis/ Analysis
+
+ @example:
+ If receive an error "Authorization Denied", check policy **Analysis** / **Delete** in Access Management.
+ ```python
+ resources = Resources()
+ result = resources.analyses.delete("analysis-id-123")
+ print(result) # Successfully Removed
+ ```
- :param GenericID analysisID: Analyze identification
+ :param GenericID analysisID: Analysis identification
+ :return: Success message
+ :rtype: str
"""
result = self.doRequest(
{
@@ -100,9 +167,23 @@ def delete(self, analysisID: GenericID) -> str:
def info(self, analysisID: GenericID) -> AnalysisInfo:
"""
- Gets information about the analyze
+ @description:
+ Retrieves detailed information about a specific analysis.
+
+ @see:
+ https://docs.tago.io/docs/tagoio/analysis/ Analysis
+
+ @example:
+ If receive an error "Authorization Denied", check policy **Analysis** / **Access** in Access Management.
+ ```python
+ resources = Resources()
+ analysis_info = resources.analyses.info("analysis-id-123")
+ print(analysis_info) # {'id': 'analysis-id-123', 'name': 'My Analysis', ...}
+ ```
- :param GenericID analysisID: Analyze identification
+ :param GenericID analysisID: Analysis identification
+ :return: Detailed analysis information
+ :rtype: AnalysisInfo
"""
result = self.doRequest(
{
@@ -115,10 +196,24 @@ def info(self, analysisID: GenericID) -> AnalysisInfo:
def run(self, analysisID: GenericID, scopeObj: Optional[Dict[str, Any]] = None) -> Dict[str, GenericToken]:
"""
- Force analyze to run
+ @description:
+ Executes an analysis with optional scope parameters.
+
+ @see:
+ https://docs.tago.io/docs/tagoio/analysis/ Analysis
+
+ @example:
+ If receive an error "Authorization Denied", check policy **Analysis** / **Run Analysis** in Access Management.
+ ```python
+ resources = Resources()
+ result = resources.analyses.run("analysis-id-123", {"environment": "production"})
+ print(result["analysis_token"]) # analysis-token-123
+ ```
- :param GenericID analysisID: Analyze identification
+ :param GenericID analysisID: Analysis identification
:param Optional[Dict[str, Any]] scopeObj: Simulate scope for analysis
+ :return: Dictionary containing the analysis token
+ :rtype: Dict[str, GenericToken]
"""
result = self.doRequest(
{
@@ -131,9 +226,23 @@ def run(self, analysisID: GenericID, scopeObj: Optional[Dict[str, Any]] = None)
def tokenGenerate(self, analysisID: GenericID) -> Dict[str, str]:
"""
- Generate a new token for the analysis
+ @description:
+ Generates a new token for the analysis.
+ This is only allowed when the analysis is running in external mode.
+
+ @see:
+ https://docs.tago.io/docs/tagoio/analysis/ Analysis
+
+ @example:
+ ```python
+ resources = Resources()
+ token = resources.analyses.tokenGenerate("analysis-id-123")
+ print(token["analysis_token"]) # analysis-token-123
+ ```
- :param GenericID analysisID: Analyze identification
+ :param GenericID analysisID: Analysis identification
+ :return: Dictionary containing the new analysis token
+ :rtype: Dict[str, str]
"""
result = self.doRequest(
{
@@ -145,10 +254,29 @@ def tokenGenerate(self, analysisID: GenericID) -> Dict[str, str]:
def uploadScript(self, analysisID: GenericID, fileObj: ScriptFile) -> str:
"""
- Upload a file (base64) to Analysis. Automatically erase the old one
+ @description:
+ Uploads a script file to an analysis.
+ The file content must be base64-encoded. This automatically replaces the old script.
+
+ @see:
+ https://docs.tago.io/docs/tagoio/analysis/ Analysis
- :param GenericID analysisID: Analyze identification
- :param ScriptFile fileObj: Object with name, language and content of the file
+ @example:
+ If receive an error "Authorization Denied", check policy **Analysis** / **Upload Analysis Script** in Access Management.
+ ```python
+ resources = Resources()
+ result = resources.analyses.uploadScript("analysis-id-123", {
+ "name": "script.py",
+ "content": "base64-encoded-content",
+ "language": "python"
+ })
+ print(result) # Successfully Uploaded
+ ```
+
+ :param GenericID analysisID: Analysis identification
+ :param ScriptFile fileObj: Object with name, language and content (base64) of the file
+ :return: Success message
+ :rtype: str
"""
result = self.doRequest(
{
@@ -163,12 +291,32 @@ def uploadScript(self, analysisID: GenericID, fileObj: ScriptFile) -> str:
)
return result
- def downloadScript(self, analysisID: GenericID, options: Optional[Dict[Literal["version"], int]] = None) -> Dict:
+ def downloadScript(
+ self,
+ analysisID: GenericID,
+ options: Optional[Dict[Literal["version"], int]] = None,
+ ) -> Dict:
"""
- Get a url to download the analysis. If `version` is specified in `options`, downloads a specific version.
+ @description:
+ Gets a download URL for the analysis script.
+ If version is specified in options, downloads a specific version.
+
+ @see:
+ https://docs.tago.io/docs/tagoio/analysis/ Analysis
+
+ @example:
+ If receive an error "Authorization Denied", check policy **Analysis** / **Download Analysis Script** in Access Management.
+ ```python
+ resources = Resources()
+ download = resources.analyses.downloadScript("analysis-id-123", {"version": 1})
+ print(download["url"]) # https://...
+ print(download["expire_at"]) # 2025-01-13T...
+ ```
:param GenericID analysisID: Analysis identification
- :param Optional[Dict[str, int]] options: Options for the Analysis script to download
+ :param Optional[Dict[str, int]] options: Options for the Analysis script to download (e.g., {"version": 1})
+ :return: Dictionary with download URL, size information, and expiration date
+ :rtype: Dict
"""
version = options.get("version") if options else None
@@ -181,3 +329,73 @@ def downloadScript(self, analysisID: GenericID, options: Optional[Dict[Literal["
)
result = dateParser(result, ["expire_at"])
return result
+
+ def listSnippets(self, runtime: SnippetRuntime) -> SnippetsListResponse:
+ """
+ @description:
+ Get all available snippets for a specific runtime environment.
+ Fetches analysis code snippets from the public TagoIO snippets repository.
+
+ @see:
+ https://help.tago.io/portal/en/kb/articles/64-script-examples Script Examples
+ https://help.tago.io/portal/en/kb/articles/104-script-editor Script Editor
+
+ @example:
+ ```python
+ resources = Resources()
+ python_snippets = resources.analyses.listSnippets("python-rt2025")
+
+ # Print all snippet titles
+ for snippet in python_snippets["snippets"]:
+ print(f"{snippet['title']}: {snippet['description']}")
+ ```
+
+ :param SnippetRuntime runtime: The runtime environment to get snippets for
+ :return: Snippets metadata including runtime, schema version, and list of available snippets
+ :rtype: SnippetsListResponse
+ """
+ url = f"{SNIPPETS_BASE_URL}/{runtime}.json"
+
+ try:
+ response = requests.get(url, headers={"Accept": "*/*"}, timeout=10)
+ response.raise_for_status()
+ return response.json()
+ except requests.exceptions.RequestException as e:
+ raise RuntimeError(f"Failed to fetch snippets: {e}") from e
+
+ def getSnippetFile(self, runtime: SnippetRuntime, filename: str) -> str:
+ """
+ @description:
+ Get the raw source code content of a specific snippet file.
+ Fetches the actual code content from the TagoIO snippets repository.
+
+ @see:
+ https://help.tago.io/portal/en/kb/articles/64-script-examples Script Examples
+ https://help.tago.io/portal/en/kb/articles/104-script-editor Script Editor
+
+ @example:
+ ```python
+ resources = Resources()
+
+ # Get TypeScript code for console example
+ code = resources.analyses.getSnippetFile("deno-rt2025", "console.ts")
+ print(code)
+
+ # Get Python code for data processing
+ python_code = resources.analyses.getSnippetFile("python-rt2025", "avg-min-max.py")
+ print(python_code)
+ ```
+
+ :param SnippetRuntime runtime: The runtime environment the snippet belongs to
+ :param str filename: The filename of the snippet to retrieve
+ :return: Raw file content as string
+ :rtype: str
+ """
+ url = f"{SNIPPETS_BASE_URL}/{runtime}/{filename}"
+
+ try:
+ response = requests.get(url, headers={"Accept": "*/*"}, timeout=10)
+ response.raise_for_status()
+ return response.text
+ except requests.exceptions.RequestException as e:
+ raise RuntimeError(f"Failed to fetch snippet file: {e}") from e
diff --git a/src/tagoio_sdk/modules/Resources/Analysis_Types.py b/src/tagoio_sdk/modules/Resources/Analysis_Types.py
index 3ab903d..4d32035 100644
--- a/src/tagoio_sdk/modules/Resources/Analysis_Types.py
+++ b/src/tagoio_sdk/modules/Resources/Analysis_Types.py
@@ -44,7 +44,11 @@ class AnalysisInfo(AnalysisCreateInfo):
class AnalysisQuery(Query):
- fields: Optional[List[Literal["name", "active", "run_on", "last_run", "created_at", "updated_at"]]]
+ fields: Optional[
+ List[
+ Literal["name", "active", "run_on", "last_run", "created_at", "updated_at"]
+ ]
+ ]
class AnalysisListItem(TypedDict, total=False):
@@ -57,3 +61,41 @@ class AnalysisListItem(TypedDict, total=False):
updated_at: Optional[str]
locked_at: Optional[str]
console: Optional[List[str]]
+
+
+SnippetRuntime = Literal[
+ "node-legacy", "python-legacy", "node-rt2025", "python-rt2025", "deno-rt2025"
+]
+"""Available runtime environments for snippets"""
+
+
+class SnippetItem(TypedDict):
+ """Individual snippet metadata"""
+
+ id: str
+ """Unique identifier for the snippet"""
+ title: str
+ """Human-readable title"""
+ description: str
+ """Description of what the snippet does"""
+ language: str
+ """Programming language (typescript, javascript, python)"""
+ tags: List[str]
+ """Array of tags for categorization"""
+ filename: str
+ """Filename of the snippet"""
+ file_path: str
+ """Full path to the file in the runtime directory"""
+
+
+class SnippetsListResponse(TypedDict):
+ """API response containing all snippets metadata for a runtime"""
+
+ runtime: SnippetRuntime
+ """Runtime environment identifier"""
+ schema_version: int
+ """Schema version for the API response format"""
+ generated_at: str
+ """ISO timestamp when the response was generated"""
+ snippets: List[SnippetItem]
+ """Array of all available snippets for this runtime"""
diff --git a/src/tagoio_sdk/regions.py b/src/tagoio_sdk/regions.py
index 7df30f2..128d77b 100644
--- a/src/tagoio_sdk/regions.py
+++ b/src/tagoio_sdk/regions.py
@@ -1,55 +1,127 @@
import os
-from contextlib import suppress
from typing import Literal
from typing import Optional
from typing import TypedDict
+from typing import Union
-class RegionDefinition(TypedDict):
+class RegionsObjApi(TypedDict):
+ """Region configuration with API/SSE endpoints."""
+
api: str
- realtime: str
sse: str
-# noRegionWarning = False
+class RegionsObjTDeploy(TypedDict):
+ """Region configuration with TagoIO Deploy Project ID."""
+
+ tdeploy: str
+
+
+RegionsObj = Union[RegionsObjApi, RegionsObjTDeploy]
+"""Region configuration object (either API/SSE pair or TDeploy)"""
+
+Regions = Literal["us-e1", "eu-w1", "env"]
+"""Supported TagoIO regions"""
-regionsDefinition = {
- "usa-1": {
+# Runtime region cache
+runtimeRegion: Optional[RegionsObj] = None
+
+# Object of Regions Definition
+regionsDefinition: dict[str, Optional[RegionsObjApi]] = {
+ "us-e1": {
"api": "https://api.tago.io",
- "realtime": "wss://realtime.tago.io",
"sse": "https://sse.tago.io/events",
},
- "env": None, # ? process object should be on trycatch.
+ "eu-w1": {
+ "api": "https://api.eu-w1.tago.io",
+ "sse": "https://sse.eu-w1.tago.io/events",
+ },
+ "env": None, # process object should be on trycatch
}
-Regions = Literal["usa-1", "env"]
+def getConnectionURI(region: Optional[Union[Regions, RegionsObj]] = None) -> RegionsObjApi:
+ """
+ Get connection URI for API and SSE.
+
+ Args:
+ region: Region identifier or configuration object
+
+ Returns:
+ Region configuration with API and SSE endpoints
+
+ Raises:
+ ReferenceError: If invalid region is specified
+ """
+ global runtimeRegion
-def getConnectionURI(region: Optional[Regions]) -> RegionDefinition:
- value = None
- with suppress(KeyError):
- value = regionsDefinition[region]
+ # Handle tdeploy in RegionsObj - takes precedence
+ if isinstance(region, dict) and "tdeploy" in region:
+ tdeploy = region["tdeploy"].strip()
+ if tdeploy:
+ return {
+ "api": f"https://api.{tdeploy}.tagoio.net",
+ "sse": f"https://sse.{tdeploy}.tagoio.net/events",
+ }
+
+ normalized_region = region
+ if isinstance(normalized_region, str) and normalized_region == "usa-1":
+ normalized_region = "us-e1"
+
+ value: Optional[RegionsObjApi] = None
+ if isinstance(normalized_region, str):
+ value = regionsDefinition.get(normalized_region)
+ elif isinstance(normalized_region, dict):
+ # If it's already a RegionsObj with api/sse, use it
+ if "api" in normalized_region and "sse" in normalized_region:
+ value = normalized_region
if value is not None:
return value
+ if runtimeRegion is not None:
+ return runtimeRegion
+
if region is not None and region != "env":
- raise Exception(f"> TagoIO-SDK: Invalid region {region}.")
+ raise ReferenceError(f"> TagoIO-SDK: Invalid region {region}.")
try:
api = os.environ.get("TAGOIO_API")
- realtime = os.environ.get("TAGOIO_REALTIME")
sse = os.environ.get("TAGOIO_SSE")
if not api and region != "env":
raise Exception("Invalid Env")
- return {"api": api, "realtime": realtime, "sse": sse}
+ return {"api": api or "", "sse": sse or ""}
except Exception:
- # global noRegionWarning
- # if noRegionWarning is False:
+ # if not noRegionWarning:
# print("> TagoIO-SDK: No region or env defined, using fallback as usa-1.")
# noRegionWarning = True
- return regionsDefinition["usa-1"]
+ return regionsDefinition["us-e1"]
+
+
+def setRuntimeRegion(region: RegionsObj) -> None:
+ """
+ Set region in-memory to be inherited by other modules when set in the Analysis runtime
+ with `Analysis.use()`.
+
+ Example:
+ ```python
+ def my_analysis(context, scope):
+ # this uses the region defined through `use`
+ resources = Resources({"token": token})
+
+ # it's still possible to override if needed
+ europe_resources = Resources({"token": token, "region": "eu-w1"})
+
+ Analysis.use(my_analysis, {"region": "us-e1"})
+ ```
+
+ Args:
+ region: Region configuration object
+ """
+ global runtimeRegion
+ runtimeRegion = region
diff --git a/tests/Regions/test_tdeploy.py b/tests/Regions/test_tdeploy.py
new file mode 100644
index 0000000..166bf03
--- /dev/null
+++ b/tests/Regions/test_tdeploy.py
@@ -0,0 +1,92 @@
+from tagoio_sdk.regions import getConnectionURI
+
+"""Test suite for TagoIO Deploy (tdeploy) Region Support"""
+
+
+def testShouldGenerateCorrectEndpointsForTdeployRegion():
+ """Should generate correct endpoints for tdeploy region"""
+ tdeploy = "68951c0e023862b2aea00f3f"
+ region = {"tdeploy": tdeploy}
+
+ result = getConnectionURI(region)
+
+ assert result["api"] == f"https://api.{tdeploy}.tagoio.net"
+ assert result["sse"] == f"https://sse.{tdeploy}.tagoio.net/events"
+
+
+def testShouldPrioritizeTdeployOverOtherFields():
+ """Should prioritize tdeploy over other fields when both are provided"""
+ tdeploy = "68951c0e023862b2aea00f3f"
+ # mixing api/sse with tdeploy is no longer allowed by types;
+ # pass only tdeploy and ensure correct priority handling remains
+ region = {"tdeploy": tdeploy}
+
+ result = getConnectionURI(region)
+
+ assert result["api"] == f"https://api.{tdeploy}.tagoio.net"
+ assert result["sse"] == f"https://sse.{tdeploy}.tagoio.net/events"
+
+
+def testShouldTrimWhitespaceFromTdeployValue():
+ """Should trim whitespace from tdeploy value"""
+ tdeploy = " 68951c0e023862b2aea00f3f "
+ region = {"tdeploy": tdeploy}
+
+ result = getConnectionURI(region)
+
+ assert result["api"] == "https://api.68951c0e023862b2aea00f3f.tagoio.net"
+ assert result["sse"] == "https://sse.68951c0e023862b2aea00f3f.tagoio.net/events"
+
+
+def testShouldFallbackToStandardBehaviorWhenTdeployIsEmpty():
+ """Should fallback to standard behavior when tdeploy is empty"""
+ region = {
+ "tdeploy": "",
+ "api": "https://custom-api.example.com",
+ "sse": "https://custom-sse.example.com",
+ }
+
+ # Empty tdeploy should fallback to api/sse fields
+ result = getConnectionURI(region)
+
+ assert result["api"] == "https://custom-api.example.com"
+ assert result["sse"] == "https://custom-sse.example.com"
+
+
+def testShouldFallbackToStandardBehaviorWhenTdeployIsWhitespaceOnly():
+ """Should fallback to standard behavior when tdeploy is whitespace only"""
+ region = {
+ "tdeploy": " ",
+ "api": "https://custom-api.example.com",
+ "sse": "https://custom-sse.example.com",
+ }
+
+ # Whitespace-only tdeploy should fallback to api/sse fields
+ result = getConnectionURI(region)
+
+ assert result["api"] == "https://custom-api.example.com"
+ assert result["sse"] == "https://custom-sse.example.com"
+
+
+def testShouldMaintainBackwardCompatibilityWithExistingRegions():
+ """Should maintain backward compatibility with existing regions"""
+ result1 = getConnectionURI("us-e1")
+ assert result1["api"] == "https://api.tago.io"
+ assert result1["sse"] == "https://sse.tago.io/events"
+
+ result2 = getConnectionURI("eu-w1")
+ assert result2["api"] == "https://api.eu-w1.tago.io"
+ assert result2["sse"] == "https://sse.eu-w1.tago.io/events"
+
+
+def testShouldMaintainBackwardCompatibilityWithCustomRegionsObj():
+ """Should maintain backward compatibility with custom RegionsObj"""
+ customRegion = {
+ "api": "https://my-api.com",
+ "sse": "https://my-sse.com",
+ }
+
+ result = getConnectionURI(customRegion)
+
+ assert result["api"] == "https://my-api.com"
+ assert result["sse"] == "https://my-sse.com"
diff --git a/tests/Resources/test_analyses.py b/tests/Resources/test_analyses.py
index bbd36c7..e031223 100644
--- a/tests/Resources/test_analyses.py
+++ b/tests/Resources/test_analyses.py
@@ -1,8 +1,13 @@
import os
+
from requests_mock.mocker import Mocker
+from tagoio_sdk.modules.Resources.Analysis_Types import AnalysisCreateInfo
+from tagoio_sdk.modules.Resources.Analysis_Types import AnalysisInfo
+from tagoio_sdk.modules.Resources.Analysis_Types import AnalysisListItem
+from tagoio_sdk.modules.Resources.Analysis_Types import ScriptFile
from tagoio_sdk.modules.Resources.Resources import Resources
-from tagoio_sdk.modules.Resources.Analysis_Types import AnalysisCreateInfo, AnalysisInfo, ScriptFile, AnalysisListItem
+
os.environ["T_ANALYSIS_TOKEN"] = "your_token_value"
@@ -18,7 +23,7 @@ def mockAnalysisList() -> list[AnalysisListItem]:
"updated_at": "2023-03-07T01:43:45.952Z",
"last_run": "2023-03-07T01:43:45.952Z",
}
- ]
+ ],
}
@@ -31,18 +36,12 @@ def mockAnalysisInfo() -> AnalysisInfo:
"created_at": "2023-03-07T01:43:45.952Z",
"updated_at": "2023-03-07T01:43:45.952Z",
"last_run": "2023-03-07T01:43:45.952Z",
- }
+ },
}
def mockCreateAnalysis() -> dict:
- return {
- "status": True,
- "result": {
- "id": "analysis_id",
- "token": "analysis_token"
- }
- }
+ return {"status": True, "result": {"id": "analysis_id", "token": "analysis_token"}}
def testAnalysesMethodList(requests_mock: Mocker) -> None:
@@ -78,7 +77,10 @@ def testAnalysesMethodEdit(requests_mock: Mocker) -> None:
:param requests_mock are a plugin of pytest to mock the requests.
"""
analysis_data = AnalysisInfo(name="Updated Analysis")
- requests_mock.put("https://api.tago.io/analysis/analysis_id", json={"status": True, "result": "success"})
+ requests_mock.put(
+ "https://api.tago.io/analysis/analysis_id",
+ json={"status": True, "result": "success"},
+ )
resources = Resources()
response = resources.analysis.edit("analysis_id", analysis_data)
@@ -91,7 +93,10 @@ def testAnalysesMethodDelete(requests_mock: Mocker) -> None:
"""
:param requests_mock are a plugin of pytest to mock the requests.
"""
- requests_mock.delete("https://api.tago.io/analysis/analysis_id", json={"status": True, "result": "success"})
+ requests_mock.delete(
+ "https://api.tago.io/analysis/analysis_id",
+ json={"status": True, "result": "success"},
+ )
resources = Resources()
response = resources.analysis.delete("analysis_id")
@@ -104,7 +109,9 @@ def testAnalysesMethodInfo(requests_mock: Mocker) -> None:
"""
:param requests_mock are a plugin of pytest to mock the requests.
"""
- requests_mock.get("https://api.tago.io/analysis/analysis_id", json=mockAnalysisInfo())
+ requests_mock.get(
+ "https://api.tago.io/analysis/analysis_id", json=mockAnalysisInfo()
+ )
resources = Resources()
response = resources.analysis.info("analysis_id")
@@ -117,7 +124,10 @@ def testAnalysesMethodRun(requests_mock: Mocker) -> None:
"""
:param requests_mock are a plugin of pytest to mock the requests.
"""
- requests_mock.post("https://api.tago.io/analysis/analysis_id/run", json={"status": True, "result": {"token": "run_token"}})
+ requests_mock.post(
+ "https://api.tago.io/analysis/analysis_id/run",
+ json={"status": True, "result": {"token": "run_token"}},
+ )
resources = Resources()
response = resources.analysis.run("analysis_id")
@@ -130,7 +140,10 @@ def testAnalysesMethodTokenGenerate(requests_mock: Mocker) -> None:
"""
:param requests_mock are a plugin of pytest to mock the requests.
"""
- requests_mock.get("https://api.tago.io/analysis/analysis_id/token", json={"status": True, "result": {"token": "new_token"}})
+ requests_mock.get(
+ "https://api.tago.io/analysis/analysis_id/token",
+ json={"status": True, "result": {"token": "new_token"}},
+ )
resources = Resources()
response = resources.analysis.tokenGenerate("analysis_id")
@@ -143,8 +156,13 @@ def testAnalysesMethodUploadScript(requests_mock: Mocker) -> None:
"""
:param requests_mock are a plugin of pytest to mock the requests.
"""
- script_file = ScriptFile(name="script.js", language="node", content="console.log('Hello, World!');")
- requests_mock.post("https://api.tago.io/analysis/analysis_id/upload", json={"status": True, "result": "success"})
+ script_file = ScriptFile(
+ name="script.js", language="node", content="console.log('Hello, World!');"
+ )
+ requests_mock.post(
+ "https://api.tago.io/analysis/analysis_id/upload",
+ json={"status": True, "result": "success"},
+ )
resources = Resources()
response = resources.analysis.uploadScript("analysis_id", script_file)
@@ -157,10 +175,120 @@ def testAnalysesMethodDownloadScript(requests_mock: Mocker) -> None:
"""
:param requests_mock are a plugin of pytest to mock the requests.
"""
- requests_mock.get("https://api.tago.io/analysis/analysis_id/download", json={"status": True, "result": {"url": "https://download.url"}})
+ requests_mock.get(
+ "https://api.tago.io/analysis/analysis_id/download",
+ json={"status": True, "result": {"url": "https://download.url"}},
+ )
resources = Resources()
response = resources.analysis.downloadScript("analysis_id")
assert response == {"url": "https://download.url"}
assert isinstance(response, dict)
+
+
+def testAnalysesMethodListSnippets(requests_mock: Mocker) -> None:
+ """
+ Test listSnippets method to retrieve all available snippets for a runtime.
+ :param requests_mock are a plugin of pytest to mock the requests.
+ """
+ mock_snippets_response = {
+ "runtime": "python-rt2025",
+ "schema_version": 1,
+ "generated_at": "2025-01-13T12:00:00Z",
+ "snippets": [
+ {
+ "id": "console-example",
+ "title": "Console Example",
+ "description": "Basic console logging example",
+ "language": "python",
+ "tags": ["basics", "logging"],
+ "filename": "console.py",
+ "file_path": "python-rt2025/console.py",
+ },
+ {
+ "id": "data-processing",
+ "title": "Data Processing",
+ "description": "Process device data",
+ "language": "python",
+ "tags": ["data", "processing"],
+ "filename": "process-data.py",
+ "file_path": "python-rt2025/process-data.py",
+ },
+ ],
+ }
+
+ requests_mock.get(
+ "https://snippets.tago.io/python-rt2025.json", json=mock_snippets_response
+ )
+
+ resources = Resources()
+ response = resources.analysis.listSnippets("python-rt2025")
+
+ assert response["runtime"] == "python-rt2025"
+ assert response["schema_version"] == 1
+ assert len(response["snippets"]) == 2
+ assert response["snippets"][0]["id"] == "console-example"
+ assert response["snippets"][0]["title"] == "Console Example"
+ assert isinstance(response, dict)
+ assert isinstance(response["snippets"], list)
+
+
+def testAnalysesMethodGetSnippetFile(requests_mock: Mocker) -> None:
+ """
+ Test getSnippetFile method to retrieve raw snippet file content.
+ :param requests_mock are a plugin of pytest to mock the requests.
+ """
+ mock_file_content = """# Console Example
+from tagoio_sdk import Analysis
+
+def my_analysis(context):
+ context.log("Hello from Python snippet!")
+
+Analysis(my_analysis)
+"""
+
+ requests_mock.get(
+ "https://snippets.tago.io/python-rt2025/console.py", text=mock_file_content
+ )
+
+ resources = Resources()
+ response = resources.analysis.getSnippetFile("python-rt2025", "console.py")
+
+ assert "Console Example" in response
+ assert "context.log" in response
+ assert isinstance(response, str)
+
+
+def testAnalysesMethodListSnippetsError(requests_mock: Mocker) -> None:
+ """
+ Test listSnippets method error handling when request fails.
+ :param requests_mock are a plugin of pytest to mock the requests.
+ """
+ requests_mock.get("https://snippets.tago.io/invalid-runtime.json", status_code=404)
+
+ resources = Resources()
+
+ try:
+ resources.analysis.listSnippets("invalid-runtime")
+ raise AssertionError("Expected RuntimeError to be raised")
+ except RuntimeError as e:
+ assert "Failed to fetch snippets" in str(e)
+
+
+def testAnalysesMethodGetSnippetFileError(requests_mock: Mocker) -> None:
+ """
+ Test getSnippetFile method error handling when file not found.
+ :param requests_mock are a plugin of pytest to mock the requests.
+ """
+ requests_mock.get(
+ "https://snippets.tago.io/python-rt2025/nonexistent.py", status_code=404
+ )
+
+ resources = Resources()
+
+ try:
+ resources.analysis.getSnippetFile("python-rt2025", "nonexistent.py")
+ raise AssertionError("Expected RuntimeError to be raised")
+ except RuntimeError as e:
+ assert "Failed to fetch snippet file" in str(e)