From 8bfc8f8c0a23444b1f748de594e4759377f0e02e Mon Sep 17 00:00:00 2001 From: Ahmed Ali Date: Sun, 23 Nov 2025 06:28:44 +0100 Subject: [PATCH 1/3] feat: add saveToFile tool --- components.json | 4 +- lfx/src/lfx/components/data/save_file.py | 418 +++++++++++++++++++++-- 2 files changed, 401 insertions(+), 21 deletions(-) diff --git a/components.json b/components.json index 9498c72..0cebc1f 100644 --- a/components.json +++ b/components.json @@ -7,6 +7,6 @@ "SearchAPIComponent": "lfx.components.tools.search_api", "SerpAPIComponent": "lfx.components.tools.serp_api", "TavilySearchToolComponent": "lfx.components.tools.tavily_search_tool", - "WikipediaAPIComponent": "lfx.components.tools.wikipedia_api" + "WikipediaAPIComponent": "lfx.components.tools.wikipedia_api", + "SaveToFileComponent": "lfx.components.data.save_file" } - diff --git a/lfx/src/lfx/components/data/save_file.py b/lfx/src/lfx/components/data/save_file.py index 77e6769..b40d5e4 100644 --- a/lfx/src/lfx/components/data/save_file.py +++ b/lfx/src/lfx/components/data/save_file.py @@ -318,14 +318,87 @@ async def _upload_file(self, file_path: Path) -> None: def _save_dataframe(self, dataframe: DataFrame, path: Path, fmt: str) -> str: """Save a DataFrame to the specified file format.""" + file_exists = path.exists() + print(f"[SaveFile] 💾 Writing DataFrame to {fmt} file: exists={file_exists}, mode={'append rows' if file_exists else 'create'}", flush=True) + print(f"[SaveFile] 💾 DataFrame shape: {dataframe.shape}", flush=True) + if fmt == "csv": - dataframe.to_csv(path, index=False) + if file_exists: + try: + existing_df = pd.read_csv(path) + print(f"[SaveFile] 🔄 Read existing CSV with {len(existing_df)} row(s)", flush=True) + combined_df = pd.concat([existing_df, dataframe], ignore_index=True) + combined_df.to_csv(path, index=False) + print(f"[SaveFile] 💾 ✅ Appended rows to CSV (now {len(combined_df)} rows): {path}", flush=True) + except Exception as e: + print(f"[SaveFile] ⚠️ Error reading existing CSV: {e}, creating new file", flush=True) + dataframe.to_csv(path, index=False) + print(f"[SaveFile] 💾 ✅ Created new CSV file: {path}", flush=True) + else: + dataframe.to_csv(path, index=False) + print(f"[SaveFile] 💾 ✅ Created new CSV file: {path}", flush=True) elif fmt == "excel": - dataframe.to_excel(path, index=False, engine="openpyxl") + if file_exists: + try: + existing_df = pd.read_excel(path, engine="openpyxl") + print(f"[SaveFile] 🔄 Read existing Excel with {len(existing_df)} row(s)", flush=True) + combined_df = pd.concat([existing_df, dataframe], ignore_index=True) + combined_df.to_excel(path, index=False, engine="openpyxl") + print(f"[SaveFile] 💾 ✅ Appended rows to Excel (now {len(combined_df)} rows): {path}", flush=True) + except Exception as e: + print(f"[SaveFile] ⚠️ Error reading existing Excel: {e}, creating new file", flush=True) + dataframe.to_excel(path, index=False, engine="openpyxl") + print(f"[SaveFile] 💾 ✅ Created new Excel file: {path}", flush=True) + else: + dataframe.to_excel(path, index=False, engine="openpyxl") + print(f"[SaveFile] 💾 ✅ Created new Excel file: {path}", flush=True) elif fmt == "json": - dataframe.to_json(path, orient="records", indent=2) + if file_exists: + try: + with path.open("r", encoding="utf-8") as f: + existing_content = f.read().strip() + + json_array = [] + if existing_content.startswith("["): + json_array = json.loads(existing_content) + print(f"[SaveFile] 🔄 Read existing JSON array with {len(json_array)} item(s)", flush=True) + elif existing_content.startswith("{"): + json_array = [json.loads(existing_content)] + print(f"[SaveFile] 🔄 Converted single JSON object to array", flush=True) + + # Convert DataFrame to list of dicts and append + df_records = dataframe.to_dict(orient="records") + json_array.extend(df_records) + + with path.open("w", encoding="utf-8") as f: + f.write(json.dumps(json_array, indent=2, ensure_ascii=False)) + print(f"[SaveFile] 💾 ✅ Appended DataFrame rows to JSON array (now {len(json_array)} items): {path}", flush=True) + except Exception as e: + print(f"[SaveFile] ⚠️ Error reading existing JSON: {e}, creating new file", flush=True) + dataframe.to_json(path, orient="records", indent=2) + print(f"[SaveFile] 💾 ✅ Created new JSON file: {path}", flush=True) + else: + dataframe.to_json(path, orient="records", indent=2) + print(f"[SaveFile] 💾 ✅ Created new JSON file: {path}", flush=True) elif fmt == "markdown": - path.write_text(dataframe.to_markdown(index=False), encoding="utf-8") + if file_exists: + try: + with path.open("r", encoding="utf-8") as f: + existing_content = f.read() + + new_markdown = dataframe.to_markdown(index=False) + with path.open("a", encoding="utf-8") as f: + if existing_content and not existing_content.endswith("\n"): + f.write("\n") + f.write(f"\n{new_markdown}\n") + print(f"[SaveFile] 💾 ✅ Appended markdown table to existing file: {path}", flush=True) + except Exception as e: + print(f"[SaveFile] ⚠️ Error reading existing markdown: {e}, creating new file", flush=True) + path.write_text(dataframe.to_markdown(index=False), encoding="utf-8") + print(f"[SaveFile] 💾 ✅ Created new markdown file: {path}", flush=True) + else: + path.write_text(dataframe.to_markdown(index=False), encoding="utf-8") + print(f"[SaveFile] 💾 ✅ Created new markdown file: {path}", flush=True) else: msg = f"Unsupported DataFrame format: {fmt}" raise ValueError(msg) @@ -333,16 +406,177 @@ def _save_dataframe(self, dataframe: DataFrame, path: Path, fmt: str) -> str: def _save_data(self, data: Data, path: Path, fmt: str) -> str: """Save a Data object to the specified file format.""" + print(f"[SaveFile] 💾 _save_data called: format={fmt}, path={path}", flush=True) + + # CRITICAL: Skip if content is empty (don't fail, just skip writing) + # Check if Data object has meaningful content + text_content = getattr(data, "text", None) or (data.data.get("text") if isinstance(data.data, dict) else None) + data_dict = data.data if isinstance(data.data, dict) else {} + + print(f"[SaveFile] 💾 Data.text: {repr(text_content)[:200] if text_content else 'None'}", flush=True) + print(f"[SaveFile] 💾 Data.data: {repr(data_dict)[:200] if data_dict else 'None'}", flush=True) + + # Check if text is empty or only whitespace + is_text_empty = not text_content or (isinstance(text_content, str) and text_content.strip() == "") + + # Check if data dict is empty or only contains empty values + is_data_empty = not data_dict or all( + not v or (isinstance(v, str) and v.strip() == "") + for v in data_dict.values() + if v is not None + ) + + if is_text_empty and is_data_empty: + print(f"[SaveFile] ⚠️ SKIP: Data content is empty, skipping write. data.data={repr(data.data)}, text={repr(text_content)}", flush=True) + logger.info(f"[SaveFile] Skipping write to '{path}' - content is empty") + return f"Skipped writing empty content to '{path}'" + + file_exists = path.exists() + print(f"[SaveFile] 💾 Writing Data to {fmt} file: exists={file_exists}", flush=True) + if fmt == "csv": - pd.DataFrame(data.data).to_csv(path, index=False) + file_exists = path.exists() + new_df = pd.DataFrame([data.data]) # Convert single data dict to DataFrame + print(f"[SaveFile] 💾 Writing Data to csv file: exists={file_exists}, mode={'append rows' if file_exists else 'create'}", flush=True) + print(f"[SaveFile] 💾 CSV data to add: {data.data}", flush=True) + + if file_exists: + try: + # Read existing CSV + existing_df = pd.read_csv(path) + print(f"[SaveFile] 🔄 Read existing CSV with {len(existing_df)} row(s)", flush=True) + # Append new row(s) + combined_df = pd.concat([existing_df, new_df], ignore_index=True) + combined_df.to_csv(path, index=False) + print(f"[SaveFile] 💾 ✅ Appended row to CSV (now {len(combined_df)} rows): {path}", flush=True) + except Exception as e: + print(f"[SaveFile] ⚠️ Error reading existing CSV: {e}, creating new file", flush=True) + # If reading fails, create new file + new_df.to_csv(path, index=False) + print(f"[SaveFile] 💾 ✅ Created new CSV file: {path}", flush=True) + else: + new_df.to_csv(path, index=False) + print(f"[SaveFile] 💾 ✅ Created new CSV file: {path}", flush=True) elif fmt == "excel": - pd.DataFrame(data.data).to_excel(path, index=False, engine="openpyxl") + file_exists = path.exists() + new_df = pd.DataFrame([data.data]) # Convert single data dict to DataFrame + print(f"[SaveFile] 💾 Writing Data to excel file: exists={file_exists}, mode={'append rows' if file_exists else 'create'}", flush=True) + print(f"[SaveFile] 💾 Excel data to add: {data.data}", flush=True) + + if file_exists: + try: + # Read existing Excel file + existing_df = pd.read_excel(path, engine="openpyxl") + print(f"[SaveFile] 🔄 Read existing Excel with {len(existing_df)} row(s)", flush=True) + # Append new row(s) + combined_df = pd.concat([existing_df, new_df], ignore_index=True) + combined_df.to_excel(path, index=False, engine="openpyxl") + print(f"[SaveFile] 💾 ✅ Appended row to Excel (now {len(combined_df)} rows): {path}", flush=True) + except Exception as e: + print(f"[SaveFile] ⚠️ Error reading existing Excel: {e}, creating new file", flush=True) + # If reading fails, create new file + new_df.to_excel(path, index=False, engine="openpyxl") + print(f"[SaveFile] 💾 ✅ Created new Excel file: {path}", flush=True) + else: + new_df.to_excel(path, index=False, engine="openpyxl") + print(f"[SaveFile] 💾 ✅ Created new Excel file: {path}", flush=True) elif fmt == "json": - path.write_text( - orjson.dumps(jsonable_encoder(data.data), option=orjson.OPT_INDENT_2).decode("utf-8"), encoding="utf-8" - ) + file_exists = path.exists() + # For JSON, use JSON array format for valid JSON file + new_item = jsonable_encoder(data.data) + print(f"[SaveFile] 💾 Writing Data to json file: exists={file_exists}, mode={'append to array' if file_exists else 'create array'}", flush=True) + print(f"[SaveFile] 💾 JSON content to add: {json.dumps(new_item)[:200]}", flush=True) + + if file_exists: + # Read existing file and parse as JSON array + try: + with path.open("r", encoding="utf-8") as f: + existing_content = f.read().strip() + + json_array = [] + + # Try to parse as JSON array + if existing_content.startswith("["): + try: + json_array = json.loads(existing_content) + print(f"[SaveFile] 🔄 Read existing JSON array with {len(json_array)} item(s)", flush=True) + except json.JSONDecodeError as e: + print(f"[SaveFile] ⚠️ Error parsing JSON array: {e}, creating new array", flush=True) + json_array = [] + + # Try to parse as single JSON object (convert to array) + elif existing_content.startswith("{"): + try: + obj = json.loads(existing_content) + json_array = [obj] + print(f"[SaveFile] 🔄 Converted single JSON object to array", flush=True) + except json.JSONDecodeError as e: + print(f"[SaveFile] ⚠️ Error parsing JSON object: {e}, creating new array", flush=True) + json_array = [] + + # Try to parse as JSONL (one object per line) + else: + lines = existing_content.split("\n") + for line in lines: + line = line.strip() + if not line: + continue + try: + obj = json.loads(line) + json_array.append(obj) + except json.JSONDecodeError: + pass + if json_array: + print(f"[SaveFile] 🔄 Converted JSONL format to array with {len(json_array)} item(s)", flush=True) + + # Add new item to array + json_array.append(new_item) + + # Write back as formatted JSON array + with path.open("w", encoding="utf-8") as f: + f.write(json.dumps(json_array, indent=2, ensure_ascii=False)) + print(f"[SaveFile] 💾 ✅ Appended to JSON array (now {len(json_array)} items): {path}", flush=True) + + except Exception as e: + print(f"[SaveFile] ⚠️ Error reading existing file: {e}, creating new array", flush=True) + # Create new array with the item + json_array = [new_item] + with path.open("w", encoding="utf-8") as f: + f.write(json.dumps(json_array, indent=2, ensure_ascii=False)) + print(f"[SaveFile] 💾 ✅ Created new JSON array file: {path}", flush=True) + else: + # Create new file with JSON array format + json_array = [new_item] + with path.open("w", encoding="utf-8") as f: + f.write(json.dumps(json_array, indent=2, ensure_ascii=False)) + print(f"[SaveFile] 💾 ✅ Created new JSON array file with Data: {path}", flush=True) elif fmt == "markdown": - path.write_text(pd.DataFrame(data.data).to_markdown(index=False), encoding="utf-8") + file_exists = path.exists() + new_df = pd.DataFrame([data.data]) # Convert single data dict to DataFrame + new_markdown = new_df.to_markdown(index=False) + print(f"[SaveFile] 💾 Writing Data to markdown file: exists={file_exists}, mode={'append' if file_exists else 'create'}", flush=True) + print(f"[SaveFile] 💾 Markdown data to add: {data.data}", flush=True) + + if file_exists: + # Read existing markdown and append new table + try: + with path.open("r", encoding="utf-8") as f: + existing_content = f.read() + + # Append new markdown table with separator + with path.open("a", encoding="utf-8") as f: + # Ensure file ends with newline + if existing_content and not existing_content.endswith("\n"): + f.write("\n") + f.write(f"\n{new_markdown}\n") + print(f"[SaveFile] 💾 ✅ Appended markdown table to existing file: {path}", flush=True) + except Exception as e: + print(f"[SaveFile] ⚠️ Error reading existing markdown: {e}, creating new file", flush=True) + path.write_text(f"{new_markdown}\n", encoding="utf-8") + print(f"[SaveFile] 💾 ✅ Created new markdown file: {path}", flush=True) + else: + path.write_text(f"{new_markdown}\n", encoding="utf-8") + print(f"[SaveFile] 💾 ✅ Created new markdown file: {path}", flush=True) else: msg = f"Unsupported Data format: {fmt}" raise ValueError(msg) @@ -350,24 +584,145 @@ def _save_data(self, data: Data, path: Path, fmt: str) -> str: async def _save_message(self, message: Message, path: Path, fmt: str) -> str: """Save a Message to the specified file format, handling async iterators.""" + print(f"[SaveFile] 💾 _save_message called: format={fmt}, path={path}", flush=True) + print(f"[SaveFile] 💾 message.text type: {type(message.text).__name__}, value={repr(message.text)[:200]}", flush=True) + content = "" if message.text is None: content = "" + print(f"[SaveFile] 💾 message.text is None, content=''", flush=True) elif isinstance(message.text, AsyncIterator): + print(f"[SaveFile] 💾 message.text is AsyncIterator, extracting...", flush=True) async for item in message.text: content += str(item) + " " content = content.strip() + print(f"[SaveFile] 💾 Extracted from AsyncIterator: length={len(content)}, content={repr(content)[:200]}", flush=True) elif isinstance(message.text, Iterator): + print(f"[SaveFile] 💾 message.text is Iterator, extracting...", flush=True) content = " ".join(str(item) for item in message.text) + print(f"[SaveFile] 💾 Extracted from Iterator: length={len(content)}, content={repr(content)[:200]}", flush=True) else: content = str(message.text) + print(f"[SaveFile] 💾 Converted to string: length={len(content)}, content={repr(content)[:200]}", flush=True) + + # CRITICAL: Skip if content is empty (don't fail, just skip writing) + # This prevents writing empty files which indicates a problem in the workflow + print(f"[SaveFile] 💾 Final content: type={type(content).__name__}, length={len(content) if isinstance(content, str) else 'N/A'}, value={repr(content)[:200]}", flush=True) + if not content or (isinstance(content, str) and content.strip() == ""): + print(f"[SaveFile] ⚠️ SKIP: Content is empty, skipping write. message.text={repr(message.text)}, content={repr(content)}", flush=True) + logger.info(f"[SaveFile] Skipping write to '{path}' - content is empty") + return f"Skipped writing empty content to '{path}'" if fmt == "txt": - path.write_text(content, encoding="utf-8") + # Append mode: if file exists, append with newline; otherwise create new + file_exists = path.exists() + print(f"[SaveFile] 💾 Writing to txt file: exists={file_exists}, mode={'append' if file_exists else 'create'}", flush=True) + if file_exists: + # Ensure file ends with newline before appending + with path.open("r+", encoding="utf-8") as f: + f.seek(0, 2) # Go to end + file_size = f.tell() + if file_size > 0: + f.seek(-1, 2) # Go back one byte + last_char = f.read(1) + if last_char != "\n": + f.write("\n") # Add newline if missing + # Append new content + f.write(f"{content}\n") + print(f"[SaveFile] 💾 ✅ Appended content to existing file: {path}", flush=True) + else: + path.write_text(f"{content}\n", encoding="utf-8") + print(f"[SaveFile] 💾 ✅ Created new file with content: {path}", flush=True) elif fmt == "json": - path.write_text(json.dumps({"message": content}, indent=2), encoding="utf-8") + # For JSON, use JSON array format for valid JSON file + file_exists = path.exists() + new_item = {"message": content} + print(f"[SaveFile] 💾 Writing to json file: exists={file_exists}, mode={'append to array' if file_exists else 'create array'}", flush=True) + print(f"[SaveFile] 💾 JSON content to add: {json.dumps(new_item)[:200]}", flush=True) + + if file_exists: + # Read existing file and parse as JSON array + try: + with path.open("r", encoding="utf-8") as f: + existing_content = f.read().strip() + + json_array = [] + + # Try to parse as JSON array + if existing_content.startswith("["): + try: + json_array = json.loads(existing_content) + print(f"[SaveFile] 🔄 Read existing JSON array with {len(json_array)} item(s)", flush=True) + except json.JSONDecodeError as e: + print(f"[SaveFile] ⚠️ Error parsing JSON array: {e}, creating new array", flush=True) + json_array = [] + + # Try to parse as single JSON object (convert to array) + elif existing_content.startswith("{"): + try: + obj = json.loads(existing_content) + json_array = [obj] + print(f"[SaveFile] 🔄 Converted single JSON object to array", flush=True) + except json.JSONDecodeError: + print(f"[SaveFile] ⚠️ Error parsing JSON object: {e}, creating new array", flush=True) + json_array = [] + + # Try to parse as JSONL (one object per line) + else: + lines = existing_content.split("\n") + for line in lines: + line = line.strip() + if not line: + continue + try: + obj = json.loads(line) + json_array.append(obj) + except json.JSONDecodeError: + pass + if json_array: + print(f"[SaveFile] 🔄 Converted JSONL format to array with {len(json_array)} item(s)", flush=True) + + # Add new item to array + json_array.append(new_item) + + # Write back as formatted JSON array + with path.open("w", encoding="utf-8") as f: + f.write(json.dumps(json_array, indent=2, ensure_ascii=False)) + print(f"[SaveFile] 💾 ✅ Appended to JSON array (now {len(json_array)} items): {path}", flush=True) + + except Exception as e: + print(f"[SaveFile] ⚠️ Error reading existing file: {e}, creating new array", flush=True) + # Create new array with the item + json_array = [new_item] + with path.open("w", encoding="utf-8") as f: + f.write(json.dumps(json_array, indent=2, ensure_ascii=False)) + print(f"[SaveFile] 💾 ✅ Created new JSON array file: {path}", flush=True) + else: + # Create new file with JSON array format + json_array = [new_item] + with path.open("w", encoding="utf-8") as f: + f.write(json.dumps(json_array, indent=2, ensure_ascii=False)) + print(f"[SaveFile] 💾 ✅ Created new JSON array file with content: {path}", flush=True) elif fmt == "markdown": - path.write_text(f"**Message:**\n\n{content}", encoding="utf-8") + # Append mode for markdown + file_exists = path.exists() + print(f"[SaveFile] 💾 Writing to markdown file: exists={file_exists}, mode={'append' if file_exists else 'create'}", flush=True) + if file_exists: + # Ensure file ends with newline before appending + with path.open("r+", encoding="utf-8") as f: + f.seek(0, 2) # Go to end + file_size = f.tell() + if file_size > 0: + f.seek(-1, 2) # Go back one byte + last_char = f.read(1) + if last_char != "\n": + f.write("\n") # Add newline if missing + # Append new content with separator + f.write(f"\n**Message:**\n\n{content}\n") + print(f"[SaveFile] 💾 ✅ Appended markdown content to existing file: {path}", flush=True) + else: + path.write_text(f"**Message:**\n\n{content}\n", encoding="utf-8") + print(f"[SaveFile] 💾 ✅ Created new markdown file with content: {path}", flush=True) else: msg = f"Unsupported Message format: {fmt}" raise ValueError(msg) @@ -395,13 +750,33 @@ def _get_file_format_for_location(self, location: str) -> str: async def _save_to_local(self) -> Message: """Save file to local storage (original functionality).""" file_format = self._get_file_format_for_location("Local") + input_type = self._get_input_type() + + # Log what we're receiving + print(f"[SaveFile] 📥 RECEIVED INPUT: type={input_type}, format={file_format}, file_name={self.file_name}", flush=True) + print(f"[SaveFile] 📥 Input object: {type(self.input).__name__}", flush=True) + + # Log the actual content based on type + if input_type == "Message": + msg_text = getattr(self.input, "text", None) + msg_data = getattr(self.input, "data", None) + print(f"[SaveFile] 📥 Message.text: type={type(msg_text).__name__}, value={repr(msg_text)[:200]}", flush=True) + print(f"[SaveFile] 📥 Message.data: {repr(msg_data)[:200] if msg_data else 'None'}", flush=True) + elif input_type == "Data": + data_text = getattr(self.input, "text", None) + data_dict = getattr(self.input, "data", None) + print(f"[SaveFile] 📥 Data.text: {repr(data_text)[:200] if data_text else 'None'}", flush=True) + print(f"[SaveFile] 📥 Data.data: {repr(data_dict)[:200] if data_dict else 'None'}", flush=True) + elif input_type == "DataFrame": + print(f"[SaveFile] 📥 DataFrame shape: {self.input.shape if hasattr(self.input, 'shape') else 'N/A'}", flush=True) + print(f"[SaveFile] 📥 DataFrame preview: {str(self.input.head())[:200] if hasattr(self.input, 'head') else 'N/A'}", flush=True) # Validate file format based on input type allowed_formats = ( - self.LOCAL_MESSAGE_FORMAT_CHOICES if self._get_input_type() == "Message" else self.LOCAL_DATA_FORMAT_CHOICES + self.LOCAL_MESSAGE_FORMAT_CHOICES if input_type == "Message" else self.LOCAL_DATA_FORMAT_CHOICES ) if file_format not in allowed_formats: - msg = f"Invalid file format '{file_format}' for {self._get_input_type()}. Allowed: {allowed_formats}" + msg = f"Invalid file format '{file_format}' for {input_type}. Allowed: {allowed_formats}" raise ValueError(msg) # Prepare file path @@ -409,17 +784,22 @@ async def _save_to_local(self) -> Message: if not file_path.parent.exists(): file_path.parent.mkdir(parents=True, exist_ok=True) file_path = self._adjust_file_path_with_format(file_path, file_format) + + print(f"[SaveFile] 📁 Target file path: {file_path}", flush=True) + print(f"[SaveFile] 📁 File exists: {file_path.exists()}", flush=True) # Save the input to file based on type - if self._get_input_type() == "DataFrame": + if input_type == "DataFrame": confirmation = self._save_dataframe(self.input, file_path, file_format) - elif self._get_input_type() == "Data": + elif input_type == "Data": confirmation = self._save_data(self.input, file_path, file_format) - elif self._get_input_type() == "Message": + elif input_type == "Message": confirmation = await self._save_message(self.input, file_path, file_format) else: - msg = f"Unsupported input type: {self._get_input_type()}" + msg = f"Unsupported input type: {input_type}" raise ValueError(msg) + + print(f"[SaveFile] ✅ Save result: {confirmation}", flush=True) # Upload the saved file await self._upload_file(file_path) From 7e7073b644e46348dd120521d00b61f17fd5ed9e Mon Sep 17 00:00:00 2001 From: Ahmed Ali Date: Fri, 28 Nov 2025 10:28:27 +0100 Subject: [PATCH 2/3] fix: save to file tool --- lfx/src/lfx/components/data/save_file.py | 22 ++++++++++++++++++---- src/tool_executor/api.py | 7 ------- 2 files changed, 18 insertions(+), 11 deletions(-) diff --git a/lfx/src/lfx/components/data/save_file.py b/lfx/src/lfx/components/data/save_file.py index b40d5e4..0449d4f 100644 --- a/lfx/src/lfx/components/data/save_file.py +++ b/lfx/src/lfx/components/data/save_file.py @@ -427,8 +427,12 @@ def _save_data(self, data: Data, path: Path, fmt: str) -> str: ) if is_text_empty and is_data_empty: - print(f"[SaveFile] ⚠️ SKIP: Data content is empty, skipping write. data.data={repr(data.data)}, text={repr(text_content)}", flush=True) + print( + f"[SaveFile] ⚠️ SKIP: Data content is empty, skipping write. data.data={repr(data.data)}, text={repr(text_content)}", + flush=True, + ) logger.info(f"[SaveFile] Skipping write to '{path}' - content is empty") + setattr(self, "_skipped_last_write", True) return f"Skipped writing empty content to '{path}'" file_exists = path.exists() @@ -609,8 +613,12 @@ async def _save_message(self, message: Message, path: Path, fmt: str) -> str: # This prevents writing empty files which indicates a problem in the workflow print(f"[SaveFile] 💾 Final content: type={type(content).__name__}, length={len(content) if isinstance(content, str) else 'N/A'}, value={repr(content)[:200]}", flush=True) if not content or (isinstance(content, str) and content.strip() == ""): - print(f"[SaveFile] ⚠️ SKIP: Content is empty, skipping write. message.text={repr(message.text)}, content={repr(content)}", flush=True) + print( + f"[SaveFile] ⚠️ SKIP: Content is empty, skipping write. message.text={repr(message.text)}, content={repr(content)}", + flush=True, + ) logger.info(f"[SaveFile] Skipping write to '{path}' - content is empty") + setattr(self, "_skipped_last_write", True) return f"Skipped writing empty content to '{path}'" if fmt == "txt": @@ -788,6 +796,9 @@ async def _save_to_local(self) -> Message: print(f"[SaveFile] 📁 Target file path: {file_path}", flush=True) print(f"[SaveFile] 📁 File exists: {file_path.exists()}", flush=True) + # Track whether we skipped writing due to empty content + setattr(self, "_skipped_last_write", False) + # Save the input to file based on type if input_type == "DataFrame": confirmation = self._save_dataframe(self.input, file_path, file_format) @@ -801,8 +812,11 @@ async def _save_to_local(self) -> Message: print(f"[SaveFile] ✅ Save result: {confirmation}", flush=True) - # Upload the saved file - await self._upload_file(file_path) + # Upload the saved file only if we actually wrote content + if not getattr(self, "_skipped_last_write", False): + await self._upload_file(file_path) + else: + print("[SaveFile] ⏭️ Upload skipped because no content was written.", flush=True) # Return the final file path and confirmation message final_path = Path.cwd() / file_path if not file_path.is_absolute() else file_path diff --git a/src/tool_executor/api.py b/src/tool_executor/api.py index 4c92958..e4f4ffb 100644 --- a/src/tool_executor/api.py +++ b/src/tool_executor/api.py @@ -757,13 +757,6 @@ async def execute_component(request: ExecutionRequest) -> ExecutionResponse: deserialized_inputs["tools"] = sanitized_tools _summarize_parameters("parameters/final", component_params) - - # DEBUG: Log api_key before instantiation for AgentQL - if request.component_state.component_class == "AgentQL" and "api_key" in component_params: - api_key_val = component_params.get("api_key") - print(f"[EXECUTOR] 🎯 AgentQL api_key in component_params BEFORE instantiation: {repr(api_key_val)}", flush=True) - logger.info(f"[EXECUTOR] 🎯 AgentQL api_key in component_params: {repr(api_key_val)}") - component = component_class(**component_params) # DEBUG: Verify api_key is set on component instance From 03a372327f946883d2dc78aea994738a6c407ecb Mon Sep 17 00:00:00 2001 From: Ahmed Ali Date: Thu, 4 Dec 2025 14:15:32 +0100 Subject: [PATCH 3/3] fix: use node.json instead of old `components.json` --- TEST_STATUS.md | 126 ------- pyproject.toml | 6 +- src/{tool_executor => node}/__init__.py | 0 src/{tool_executor => node}/api.py | 427 +++++++++++++++++------- src/{tool_executor => node}/main.py | 2 +- src/{tool_executor => node}/nats.py | 28 +- 6 files changed, 324 insertions(+), 265 deletions(-) delete mode 100644 TEST_STATUS.md rename src/{tool_executor => node}/__init__.py (100%) rename src/{tool_executor => node}/api.py (74%) rename src/{tool_executor => node}/main.py (96%) rename src/{tool_executor => node}/nats.py (90%) diff --git a/TEST_STATUS.md b/TEST_STATUS.md deleted file mode 100644 index bd66b74..0000000 --- a/TEST_STATUS.md +++ /dev/null @@ -1,126 +0,0 @@ -# Test Status and CI Configuration - -## 🚀 Current Status: CI-Friendly Configuration - -✅ **Tests now pass successfully in CI environment** - -- **579 tests passing** (99% success rate) -- **6 expected skips** -- **1 expected failure** -- **Total runtime**: ~11 seconds - -## Overview - -This document describes the current status of the test suite and the tests that are temporarily skipped to keep CI green. - -## Test Suite Configuration - -Tests are configured in `pyproject.toml` under the `[tool.pytest.ini_options]` section. Some tests are currently ignored due to known issues that need to be addressed. - -## Skipped Tests for CI - -### 1. Integration Tests (External Dependencies) -These tests depend on external components and infrastructure that may not be available in CI environments: - -- `lfx/tests/unit/cli/test_run_real_flows.py` -- `lfx/tests/unit/cli/test_run_starter_projects.py` -- `lfx/tests/unit/cli/test_run_starter_projects_backward_compatibility.py` - -### 2. Executor Node Connectivity Issues -These tests fail due to executor node connectivity problems in the distributed runtime environment: - -- `lfx/tests/unit/cli/test_script_loader.py::TestIntegrationWithRealFlows::test_execute_real_flow_with_results` -- `lfx/tests/unit/cli/test_serve_app.py::TestServeAppEndpoints::test_run_endpoint_success` -- `lfx/tests/unit/cli/test_serve_app.py::TestServeAppEndpoints::test_run_endpoint_query_auth` -- `lfx/tests/unit/cli/test_serve_app.py::TestServeAppEndpoints::test_flow_run_endpoint_multi_flow` -- `lfx/tests/unit/cli/test_serve_app.py::TestServeAppEndpoints::test_flow_execution_with_message_output` -- `lfx/tests/unit/custom/custom_component/test_component_events.py::test_component_build_results` - -**Error Pattern**: `RuntimeError: Failed to call executor node: All connection attempts failed` - -**Root Cause**: These tests require a running executor node instance that isn't available in the CI environment. - -### 3. State Model and Pydantic Compatibility Issues -These tests fail due to Pydantic v2 compatibility issues, particularly around field handling and return type annotations: - -- `lfx/tests/unit/graph/graph/state/test_state_model.py::TestCreateStateModel::test_create_model_with_valid_return_type_annotations` -- `lfx/tests/unit/graph/graph/state/test_state_model.py::TestCreateStateModel::test_create_model_and_assign_values_fails` -- `lfx/tests/unit/graph/graph/state/test_state_model.py::TestCreateStateModel::test_create_with_multiple_components` -- `lfx/tests/unit/graph/graph/state/test_state_model.py::TestCreateStateModel::test_create_with_pydantic_field` -- `lfx/tests/unit/graph/graph/state/test_state_model.py::TestCreateStateModel::test_graph_functional_start_state_update` - -**Error Pattern**: Issues with Pydantic field validation, model creation, and return type annotations. - -### 4. Graph Execution Issues -These tests fail due to problems in graph execution and cycle detection: - -- `lfx/tests/unit/graph/graph/test_base.py::test_graph_with_edge` -- `lfx/tests/unit/graph/graph/test_base.py::test_graph_functional` -- `lfx/tests/unit/graph/graph/test_base.py::test_graph_functional_async_start` -- `lfx/tests/unit/graph/graph/test_base.py::test_graph_functional_start_end` -- `lfx/tests/unit/graph/graph/test_cycles.py::test_cycle_in_graph_max_iterations` -- `lfx/tests/unit/graph/graph/test_cycles.py::test_conditional_router_max_iterations` -- `lfx/tests/unit/graph/graph/test_graph_state_model.py::test_graph_functional_start_graph_state_update` -- `lfx/tests/unit/graph/graph/test_graph_state_model.py::test_graph_state_model_serialization` - -**Error Pattern**: Graph execution failures, state management issues, and cycle detection problems. - -## Current Test Statistics - -- **Total Tests**: 586 (after excluding problematic modules) -- **Passing Tests**: 579 (~99%) -- **Skipped Tests**: 6 (expected skips) -- **Expected Failures**: 1 - -**CI Status**: ✅ PASSING - -## Warnings - -The test suite generates warnings (3,152 in current run), primarily related to: - -1. **Pydantic Deprecation Warnings**: Usage of deprecated `json_encoders`, `model_fields` access patterns, and model validator configurations. -2. **Resource Warnings**: Potential memory leaks and resource management issues. -3. **Collection Warnings**: Test class constructor issues. - -## Action Items - -To restore full test coverage, the following issues need to be addressed: - -### High Priority -1. **Fix Executor Node Connectivity**: Resolve the "All connection attempts failed" error for distributed runtime tests. -2. **Pydantic Compatibility**: Update code to use Pydantic v2 compatible APIs and patterns. -3. **Reduce Warnings**: Address deprecated API usage and resource management issues. - -### Medium Priority -1. **Graph Execution**: Fix graph execution and state management issues. -2. **Test Environment**: Set up proper test infrastructure for integration tests. - -## Running Tests - -To run the tests locally: - -```bash -# Activate virtual environment -source .venv/bin/activate - -# Run all tests (excluding the skipped ones) -python -m pytest - -# Run with verbose output -python -m pytest -v - -# Run specific test files -python -m pytest lfx/tests/unit/cli/test_common.py - -# Run with coverage -python -m pytest --cov=lfx -``` - -## CI Status - -With the current configuration, CI should pass with approximately 638 passing tests. The skipped tests are temporarily excluded to maintain CI stability while the underlying issues are being addressed. - ---- - -**Last Updated**: 2025-11-25 -**Contact**: For questions about test status, please open an issue in the repository. \ No newline at end of file diff --git a/pyproject.toml b/pyproject.toml index 3eb037f..a1f102f 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -82,15 +82,15 @@ Documentation = "https://github.com/droq-ai/lfx-tool-executor-node#readme" "Bug Tracker" = "https://github.com/droq-ai/lfx-tool-executor-node/issues" [project.scripts] -lfx-tool-executor-node = "tool_executor.main:main" +lfx-tool-executor-node = "node.main:main" [build-system] requires = ["hatchling"] build-backend = "hatchling.build" [tool.hatch.build.targets.wheel] -packages = ["src/tool_executor"] -include = ["src/tool_executor/**/*", "lfx/**/*", "components.json"] +packages = ["src/node"] +include = ["src/node/**/*", "lfx/**/*", "node.json"] [tool.uv.sources] lfx = { path = "lfx" } diff --git a/src/tool_executor/__init__.py b/src/node/__init__.py similarity index 100% rename from src/tool_executor/__init__.py rename to src/node/__init__.py diff --git a/src/tool_executor/api.py b/src/node/api.py similarity index 74% rename from src/tool_executor/api.py rename to src/node/api.py index 4c92958..b424ee5 100644 --- a/src/tool_executor/api.py +++ b/src/node/api.py @@ -2,13 +2,13 @@ import asyncio import importlib -import inspect import json import logging import os import sys import time import uuid +from datetime import UTC from typing import Any from fastapi import FastAPI, HTTPException @@ -49,7 +49,7 @@ def _mask_sensitive_value(key: str, value: Any) -> Any: return "*" * len(value) return f"{value[:4]}...{value[-4:]} (len={len(value)})" return value - if isinstance(value, (dict, list)): + if isinstance(value, dict | list): return f"<{type(value).__name__}:{len(value)}>" return value @@ -66,7 +66,7 @@ def _has_meaningful_value(value: Any) -> bool: return False if isinstance(value, str): return value.strip() != "" - if isinstance(value, (list, tuple, set, dict)): + if isinstance(value, list | tuple | set | dict): return len(value) > 0 return True @@ -98,23 +98,55 @@ def _merge_runtime_inputs( return (applied, skipped_empty) + # Load component mapping from JSON file -_components_json_path = os.path.join(_node_dir, "components.json") +_components_json_path = os.path.join(_node_dir, "node.json") _component_map: dict[str, str] = {} -print(f"[EXECUTOR] Looking for components.json at: {_components_json_path}") +print(f"[EXECUTOR] Looking for node.json at: {_components_json_path}") print(f"[EXECUTOR] Node dir: {_node_dir}") if os.path.exists(_components_json_path): try: - with open(_components_json_path, "r") as f: - _component_map = json.load(f) - print(f"[EXECUTOR] ✅ Loaded {len(_component_map)} component mappings from {_components_json_path}") - logger.info(f"Loaded {len(_component_map)} component mappings from {_components_json_path}") + with open(_components_json_path) as f: + node_data = json.load(f) + # Extract components mapping from node.json structure + # node.json has structure: {"components": {"ComponentName": {"path": "...", ...}, ...}} + # Paths in node.json incorrectly have format "lfx.src.lfx.components..." + # but should be "lfx.components..." (matching old components.json format) + if "components" in node_data and isinstance(node_data["components"], dict): + _component_map = {} + for component_name, component_info in node_data["components"].items(): + if isinstance(component_info, dict) and "path" in component_info: + path = component_info.get("path", "") + # Transform path: "lfx.src.lfx.components..." -> "lfx.components..." + # Remove the incorrect "lfx.src.lfx." prefix or "lfx.src." prefix + original_path = path + if path.startswith("lfx.src.lfx."): + path = "lfx." + path[len("lfx.src.lfx.") :] + elif path.startswith("lfx.src."): + path = "lfx." + path[len("lfx.src.") :] + if original_path != path: + logger.debug( + f"Transformed path for {component_name}: " f"{original_path} -> {path}" + ) + _component_map[component_name] = path + print( + f"[EXECUTOR] ✅ Loaded {len(_component_map)} component mappings " + f"from {_components_json_path}" + ) + logger.info( + f"Loaded {len(_component_map)} component mappings from {_components_json_path}" + ) + else: + logger.warning( + f"node.json does not contain 'components' key or invalid structure " + f"at {_components_json_path}" + ) except Exception as e: - print(f"[EXECUTOR] ❌ Failed to load components.json: {e}") - logger.warning(f"Failed to load components.json: {e}") + print(f"[EXECUTOR] ❌ Failed to load node.json: {e}") + logger.warning(f"Failed to load node.json: {e}") else: - print(f"[EXECUTOR] ❌ components.json not found at {_components_json_path}") - logger.warning(f"components.json not found at {_components_json_path}") + print(f"[EXECUTOR] ❌ node.json not found at {_components_json_path}") + logger.warning(f"node.json not found at {_components_json_path}") app = FastAPI(title="Langflow Executor Node", version="0.1.0") @@ -127,7 +159,8 @@ async def get_nats_client(): global _nats_client if _nats_client is None: logger.info("[NATS] Creating new NATS client instance...") - from tool_executor.nats import NATSClient + from node.nats import NATSClient + nats_url = os.getenv("NATS_URL", "nats://localhost:4222") logger.info(f"[NATS] Connecting to NATS at {nats_url}") _nats_client = NATSClient(nats_url=nats_url) @@ -135,7 +168,10 @@ async def get_nats_client(): await _nats_client.connect() logger.info("[NATS] ✅ Successfully connected to NATS") except Exception as e: - logger.warning(f"[NATS] ❌ Failed to connect to NATS (non-critical): {e}", exc_info=True) + logger.warning( + f"[NATS] ❌ Failed to connect to NATS (non-critical): {e}", + exc_info=True, + ) _nats_client = None else: logger.debug("[NATS] Using existing NATS client instance") @@ -194,11 +230,18 @@ async def load_component_class( Raises: HTTPException: If module or class cannot be loaded """ - # If module path is wrong (validation wrapper), try to find the correct module from components.json + # If module path is wrong (validation wrapper), try to find the correct module + # from node.json if module_name in ("lfx.custom.validate", "lfx.custom.custom_component.component"): - print(f"[EXECUTOR] Module path is incorrect ({module_name}), looking up {class_name} in components.json (map size: {len(_component_map)})") - logger.info(f"Module path is incorrect ({module_name}), looking up correct module for {class_name} in components.json") - + print( + f"[EXECUTOR] Module path is incorrect ({module_name}), " + f"looking up {class_name} in node.json (map size: {len(_component_map)})" + ) + logger.info( + f"Module path is incorrect ({module_name}), " + f"looking up correct module for {class_name} in node.json" + ) + # Look up the correct module path from the JSON mapping if class_name in _component_map: correct_module = _component_map[class_name] @@ -212,7 +255,9 @@ async def load_component_class( return component_class except (ImportError, AttributeError) as e: print(f"[EXECUTOR] ❌ Failed to load {class_name} from {correct_module}: {e}") - logger.warning(f"Failed to load {class_name} from mapped module {correct_module}: {e}") + logger.warning( + f"Failed to load {class_name} from mapped module " f"{correct_module}: {e}" + ) # Fall back to code execution if module import fails if component_code: print(f"[EXECUTOR] Falling back to code execution for {class_name}") @@ -223,9 +268,12 @@ async def load_component_class( logger.error(f"Code execution also failed for {class_name}: {code_error}") # Continue to next fallback attempt else: - print(f"[EXECUTOR] ❌ Component {class_name} not found in components.json (available: {list(_component_map.keys())[:5]}...)") - logger.warning(f"Component {class_name} not found in components.json mapping") - + print( + f"[EXECUTOR] ❌ Component {class_name} not found in node.json " + f"(available: {list(_component_map.keys())[:5]}...)" + ) + logger.warning(f"Component {class_name} not found in node.json mapping") + # First try loading from the provided module path try: module = importlib.import_module(module_name) @@ -238,9 +286,7 @@ async def load_component_class( if component_code: logger.info(f"Attempting to load {class_name} from provided code") return await load_component_from_code(component_code, class_name) - raise HTTPException( - status_code=400, detail=f"Failed to import module {module_name}: {e}" - ) + raise HTTPException(status_code=400, detail=f"Failed to import module {module_name}: {e}") except AttributeError as e: logger.warning(f"Class {class_name} not found in module {module_name}: {e}") # If class not found and we have code, try executing code @@ -297,12 +343,13 @@ async def load_component_from_code(component_code: str, class_name: str) -> type namespace = { "__builtins__": __builtins__, } - + # Try to import common Langflow modules into the namespace try: import lfx.base.io.text import lfx.io import lfx.schema.message + namespace["lfx"] = __import__("lfx") namespace["lfx.base"] = __import__("lfx.base") namespace["lfx.base.io"] = __import__("lfx.base.io") @@ -312,14 +359,13 @@ async def load_component_from_code(component_code: str, class_name: str) -> type namespace["lfx.schema.message"] = lfx.schema.message except Exception as import_error: logger.warning(f"Could not pre-import some modules: {import_error}") - + exec(compile(component_code, "", "exec"), namespace) - + if class_name not in namespace: # Log what classes are available in the namespace available_classes = [ - k for k, v in namespace.items() - if isinstance(v, type) and not k.startswith("_") + k for k, v in namespace.items() if isinstance(v, type) and not k.startswith("_") ] logger.error( f"Class {class_name} not found in provided code. " @@ -332,15 +378,13 @@ async def load_component_from_code(component_code: str, class_name: str) -> type f"Available classes: {', '.join(available_classes[:5])}" ), ) - + component_class = namespace[class_name] logger.info(f"Successfully loaded {class_name} from provided code") return component_class except SyntaxError as e: logger.error(f"Syntax error in component code: {e}") - raise HTTPException( - status_code=400, detail=f"Syntax error in component code: {e}" - ) + raise HTTPException(status_code=400, detail=f"Syntax error in component code: {e}") except Exception as e: logger.error(f"Error executing component code: {e}") raise HTTPException( @@ -361,7 +405,7 @@ def serialize_result(result: Any) -> Any: # Handle None if result is None: return None - + # Handle LangChain Tool objects FIRST - explicitly preserve metadata if isinstance(result, BaseTool): tool_name = getattr(result, "name", "unknown") @@ -378,30 +422,50 @@ def serialize_result(result: Any) -> Any: "name": getattr(result, "name", ""), "description": getattr(result, "description", ""), } - + # CRITICAL: Explicitly include metadata (model_dump might not include it) if hasattr(result, "metadata") and result.metadata: - print(f"[SERIALIZE_RESULT] 🔧 Tool '{tool_name}' has metadata: {list(result.metadata.keys())}", flush=True) + print( + f"[SERIALIZE_RESULT] 🔧 Tool '{tool_name}' has metadata: " + f"{list(result.metadata.keys())}", + flush=True, + ) if "_component_state" in result.metadata: comp_state = result.metadata["_component_state"] if isinstance(comp_state, dict) and "parameters" in comp_state: params = comp_state["parameters"] api_key_val = params.get("api_key") if isinstance(params, dict) else None - print(f"[SERIALIZE_RESULT] 🎯 Tool '{tool_name}' _component_state['parameters']['api_key'] = {repr(api_key_val)}", flush=True) + print( + f"[SERIALIZE_RESULT] 🎯 Tool '{tool_name}' " + f"_component_state['parameters']['api_key'] = {repr(api_key_val)}", + flush=True, + ) tool_dict["metadata"] = serialize_result(result.metadata) else: print(f"[SERIALIZE_RESULT] ⚠️ Tool '{tool_name}' has NO metadata!", flush=True) tool_dict["metadata"] = {} - + # Recursively serialize all values serialized = {k: serialize_result(v) for k, v in tool_dict.items()} - print(f"[SERIALIZE_RESULT] ✅ Serialized Tool '{tool_name}': metadata keys = {list(serialized.get('metadata', {}).keys())}", flush=True) + print( + f"[SERIALIZE_RESULT] ✅ Serialized Tool '{tool_name}': metadata keys = " + f"{list(serialized.get('metadata', {}).keys())}", + flush=True, + ) if "_component_state" in serialized.get("metadata", {}): - print(f"[SERIALIZE_RESULT] ✅ Tool '{tool_name}' _component_state preserved in serialized result!", flush=True) + print( + f"[SERIALIZE_RESULT] ✅ Tool '{tool_name}' _component_state " + f"preserved in serialized result!", + flush=True, + ) return serialized except Exception as exc: - print(f"[SERIALIZE_RESULT] ❌ Failed to serialize tool '{tool_name}': {exc}", flush=True) + print( + f"[SERIALIZE_RESULT] ❌ Failed to serialize tool '{tool_name}': {exc}", + flush=True, + ) import traceback + print(f"[SERIALIZE_RESULT] Traceback: {traceback.format_exc()}", flush=True) logger.warning(f"Failed to serialize tool '{tool_name}': {exc}") # Fallback: return minimal representation with metadata @@ -410,29 +474,29 @@ def serialize_result(result: Any) -> Any: "description": getattr(result, "description", ""), "metadata": serialize_result(getattr(result, "metadata", {})), } - + # Handle primitive types - if isinstance(result, (str, int, float, bool)): + if isinstance(result, str | int | float | bool): return result - + # Skip type/metaclass objects - they can't be serialized if isinstance(result, type): # Return the class name as a string representation return f"" - + # Check for Pydantic metaclass specifically result_type_str = str(type(result)) if "ModelMetaclass" in result_type_str or "metaclass" in result_type_str.lower(): return f"" - + # Handle lists/tuples first (before other checks) - if isinstance(result, (list, tuple)): + if isinstance(result, list | tuple): return [serialize_result(item) for item in result] - + # Handle dicts if isinstance(result, dict): return {k: serialize_result(v) for k, v in result.items()} - + # Handle common Langflow types (Pydantic models) if hasattr(result, "model_dump"): try: @@ -450,7 +514,7 @@ def serialize_result(result: Any) -> Any: except Exception as e: logger.debug(f"dict() failed: {e}") pass - + # Try to serialize via __dict__ (but skip private attributes and classes) if hasattr(result, "__dict__"): try: @@ -467,11 +531,11 @@ def serialize_result(result: Any) -> Any: except Exception as e: logger.debug(f"__dict__ serialization failed: {e}") pass - + # For callable objects (functions, methods), return string representation if callable(result): return f"" - + # Last resort: try to convert to string try: return str(result) @@ -482,10 +546,10 @@ def serialize_result(result: Any) -> Any: def deserialize_input_value(value: Any) -> Any: """ Deserialize input value, reconstructing Langflow types from dicts. - + Args: value: Serialized input value (may be a dict representing Data/Message) - + Returns: Deserialized value with proper types reconstructed """ @@ -494,28 +558,41 @@ def deserialize_input_value(value: Any) -> Any: if isinstance(value, list): return [deserialize_input_value(item) for item in value] return value - + # Try to reconstruct Data or Message objects try: - from lfx.schema.message import Message from lfx.schema.data import Data - + from lfx.schema.message import Message + # Check if it looks like a Message (has Message-specific fields) - # Message extends Data, so it has text_key, data, and Message-specific fields like sender, category, duration, etc. - message_fields = ["sender", "category", "session_id", "timestamp", "duration", "flow_id", "error", "edit", "sender_name", "context_id"] + # Message extends Data, so it has text_key, data, and Message-specific fields + # like sender, category, duration, etc. + message_fields = [ + "sender", + "category", + "session_id", + "timestamp", + "duration", + "flow_id", + "error", + "edit", + "sender_name", + "context_id", + ] has_message_fields = any(key in value for key in message_fields) - + # Also check inside data dict (Message fields might be nested there) data_dict = value.get("data", {}) if isinstance(data_dict, dict): has_message_fields_in_data = any(key in data_dict for key in message_fields) has_message_fields = has_message_fields or has_message_fields_in_data - + if has_message_fields: # Fix timestamp format if present (convert various formats to YYYY-MM-DD HH:MM:SS UTC) if "timestamp" in value and isinstance(value["timestamp"], str): timestamp = value["timestamp"] - # Convert ISO format with T separator to space (e.g., "2025-11-14T13:09:23 UTC" -> "2025-11-14 13:09:23 UTC") + # Convert ISO format with T separator to space + # (e.g., "2025-11-14T13:09:23 UTC" -> "2025-11-14 13:09:23 UTC") if "T" in timestamp: # Replace T with space, but preserve the UTC part timestamp = timestamp.replace("T", " ") @@ -530,13 +607,14 @@ def deserialize_input_value(value: Any) -> Any: if not timestamp.endswith(" UTC") and not timestamp.endswith(" UTC"): # Try to parse and reformat using datetime try: - from datetime import datetime, timezone + from datetime import datetime + # Try common formats for fmt in ["%Y-%m-%d %H:%M:%S", "%Y-%m-%d %H:%M:%S %Z"]: try: dt = datetime.strptime(timestamp.strip(), fmt) if dt.tzinfo is None: - dt = dt.replace(tzinfo=timezone.utc) + dt = dt.replace(tzinfo=UTC) timestamp = dt.strftime("%Y-%m-%d %H:%M:%S %Z") break except ValueError: @@ -544,37 +622,49 @@ def deserialize_input_value(value: Any) -> Any: except Exception: pass value["timestamp"] = timestamp - + # Create Message object - Message constructor will handle merging fields into data dict # according to Data.validate_data logic try: message_obj = Message(**value) - logger.debug(f"[DESERIALIZE] Successfully reconstructed Message object from dict with keys: {list(value.keys())}") + logger.debug( + f"[DESERIALIZE] Successfully reconstructed Message object from dict " + f"with keys: {list(value.keys())}" + ) return message_obj except Exception as msg_error: - logger.warning(f"[DESERIALIZE] Failed to create Message from dict: {msg_error}, keys: {list(value.keys())}") + logger.warning( + f"[DESERIALIZE] Failed to create Message from dict: {msg_error}, " + f"keys: {list(value.keys())}" + ) # Try to create with just the data dict if that exists if "data" in value and isinstance(value["data"], dict): try: - return Message(data=value["data"], **{k: v for k, v in value.items() if k != "data"}) + return Message( + data=value["data"], + **{k: v for k, v in value.items() if k != "data"}, + ) except Exception: pass raise - - # Check if it looks like a Data object (has text_key or data field, but not Message-specific fields) + + # Check if it looks like a Data object (has text_key or data field, + # but not Message-specific fields) if ("data" in value or "text_key" in value) and not has_message_fields: return Data(**value) - + except Exception as e: logger.debug(f"[DESERIALIZE] Could not reconstruct object from dict: {e}") # Return as-is if reconstruction fails pass - + # For dicts, recursively deserialize values return {k: deserialize_input_value(v) for k, v in value.items()} -def sanitize_tool_inputs(component_params: dict[str, Any], component_class: str | None = None) -> list[BaseTool] | None: +def sanitize_tool_inputs( + component_params: dict[str, Any], component_class: str | None = None +) -> list[BaseTool] | None: """Ensure `tools` parameter only contains LangChain tool objects. When components (especially agents) run in tool mode, the backend currently @@ -604,7 +694,8 @@ def sanitize_tool_inputs(component_params: dict[str, Any], component_class: str if invalid_types: logger.warning( - "[%s] Dropping %d invalid tool payload(s); expected LangChain BaseTool instances, got: %s", + "[%s] Dropping %d invalid tool payload(s); " + "expected LangChain BaseTool instances, got: %s", component_class or "Component", len(invalid_types), ", ".join(sorted(set(invalid_types))), @@ -633,7 +724,9 @@ def _tool_func(*args, **kwargs): return { "tool": name, "status": "unavailable", - "message": "Tool cannot execute inside executor context; please route to appropriate node.", + "message": ( + "Tool cannot execute inside executor context; " "please route to appropriate node." + ), } try: @@ -671,12 +764,12 @@ async def execute_component(request: ExecutionRequest) -> ExecutionResponse: f"Received execution request: " f"class={request.component_state.component_class}, " f"module={request.component_state.component_module}, " - f"code_length={len(request.component_state.component_code or '') if request.component_state.component_code else 0}, " + f"code_length={len(request.component_state.component_code or '') if request.component_state.component_code else 0}, " # noqa: E501 f"stream_topic={stream_topic_value}" ) logger.info(log_msg) print(f"[EXECUTOR] {log_msg}") # Also print to ensure visibility - + # Load component class dynamically component_class = await load_component_class( request.component_state.component_module, @@ -686,15 +779,19 @@ async def execute_component(request: ExecutionRequest) -> ExecutionResponse: # Instantiate component with parameters component_params = request.component_state.parameters.copy() - + # DEBUG: Log AgentQL API key if present if request.component_state.component_class == "AgentQL" and "api_key" in component_params: api_key_val = component_params.get("api_key") - print(f"[EXECUTOR] 🎯 AgentQL API KEY received in component_state.parameters: {repr(api_key_val)}", flush=True) + print( + f"[EXECUTOR] 🎯 AgentQL API KEY received in component_state.parameters: " + f"{repr(api_key_val)}", + flush=True, + ) logger.info(f"[EXECUTOR] 🎯 AgentQL API KEY received: {repr(api_key_val)}") - + _summarize_parameters("parameters/base", component_params) - + # Merge input_values (runtime values from upstream components) into parameters # These override static parameters since they contain the actual workflow data deserialized_inputs: dict[str, Any] = {} @@ -715,7 +812,7 @@ async def execute_component(request: ExecutionRequest) -> ExecutionResponse: applied, skipped, ) - + if request.component_state.config: # Merge config into parameters with _ prefix for key, value in request.component_state.config.items(): @@ -727,10 +824,18 @@ async def execute_component(request: ExecutionRequest) -> ExecutionResponse: list((request.component_state.input_values or {}).keys()), (request.component_state.input_values or {}).get("tools"), ) - if request.component_state.input_values and request.component_state.input_values.get("tools"): + if request.component_state.input_values and request.component_state.input_values.get( + "tools" + ): sample_tool = request.component_state.input_values["tools"][0] - logger.debug("[AgentComponent] Sample tool payload keys: %s", list(sample_tool.keys())) - logger.debug("[AgentComponent] Sample tool metadata: %s", sample_tool.get("metadata")) + logger.debug( + "[AgentComponent] Sample tool payload keys: %s", + list(sample_tool.keys()), + ) + logger.debug( + "[AgentComponent] Sample tool metadata: %s", + sample_tool.get("metadata"), + ) logger.info( f"Instantiating {request.component_state.component_class} " @@ -752,7 +857,9 @@ async def execute_component(request: ExecutionRequest) -> ExecutionResponse: component_params = filtered_params # Ensure `tools` parameter contains valid tool instances only - sanitized_tools = sanitize_tool_inputs(component_params, request.component_state.component_class) + sanitized_tools = sanitize_tool_inputs( + component_params, request.component_state.component_class + ) if sanitized_tools is not None and "tools" in deserialized_inputs: deserialized_inputs["tools"] = sanitized_tools @@ -761,25 +868,41 @@ async def execute_component(request: ExecutionRequest) -> ExecutionResponse: # DEBUG: Log api_key before instantiation for AgentQL if request.component_state.component_class == "AgentQL" and "api_key" in component_params: api_key_val = component_params.get("api_key") - print(f"[EXECUTOR] 🎯 AgentQL api_key in component_params BEFORE instantiation: {repr(api_key_val)}", flush=True) + print( + f"[EXECUTOR] 🎯 AgentQL api_key in component_params BEFORE instantiation: " + f"{repr(api_key_val)}", + flush=True, + ) logger.info(f"[EXECUTOR] 🎯 AgentQL api_key in component_params: {repr(api_key_val)}") component = component_class(**component_params) - + # DEBUG: Verify api_key is set on component instance if request.component_state.component_class == "AgentQL": if hasattr(component, "api_key"): api_key_attr = getattr(component, "api_key", None) - print(f"[EXECUTOR] 🎯 AgentQL component.api_key attribute AFTER instantiation: {repr(api_key_attr)}", flush=True) - logger.info(f"[EXECUTOR] 🎯 AgentQL component.api_key attribute: {repr(api_key_attr)}") + print( + f"[EXECUTOR] 🎯 AgentQL component.api_key attribute AFTER instantiation: " + f"{repr(api_key_attr)}", + flush=True, + ) + logger.info( + f"[EXECUTOR] 🎯 AgentQL component.api_key attribute: " f"{repr(api_key_attr)}" + ) else: - print(f"[EXECUTOR] ⚠️ AgentQL component has NO api_key attribute after instantiation!", flush=True) + print( + "[EXECUTOR] ⚠️ AgentQL component has NO api_key attribute " + "after instantiation!", + flush=True, + ) logger.warning("[EXECUTOR] ⚠️ AgentQL component has NO api_key attribute!") - + # Store stream_topic on component so ComponentToolkit can access it if request.component_state.stream_topic: # Store stream_topic as an attribute so _attach_runtime_metadata can access it - component._stream_topic_from_backend = request.component_state.stream_topic # noqa: SLF001 + component._stream_topic_from_backend = ( + request.component_state.stream_topic + ) # noqa: SLF001 # Ensure runtime inputs also populate component attributes for template rendering if deserialized_inputs: @@ -806,40 +929,66 @@ async def execute_component(request: ExecutionRequest) -> ExecutionResponse: f"Executing method {request.method_name} " f"(async={request.is_async}) on {request.component_state.component_class}" ) - + # DEBUG: Log if this is to_toolkit for AgentQL - if request.method_name == "to_toolkit" and request.component_state.component_class == "AgentQL": - print(f"[EXECUTOR] 🎯 Executing to_toolkit for AgentQL component", flush=True) + if ( + request.method_name == "to_toolkit" + and request.component_state.component_class == "AgentQL" + ): + print("[EXECUTOR] 🎯 Executing to_toolkit for AgentQL component", flush=True) api_key_in_params = request.component_state.parameters.get("api_key") - print(f"[EXECUTOR] 🎯 AgentQL api_key in component_state.parameters BEFORE to_toolkit: {repr(api_key_in_params)}", flush=True) + print( + f"[EXECUTOR] 🎯 AgentQL api_key in component_state.parameters " + f"BEFORE to_toolkit: {repr(api_key_in_params)}", + flush=True, + ) # Also check if component instance has api_key if hasattr(component, "api_key"): - print(f"[EXECUTOR] 🎯 AgentQL component.api_key attribute: {repr(getattr(component, 'api_key', None))}", flush=True) + print( + f"[EXECUTOR] 🎯 AgentQL component.api_key attribute: " + f"{repr(getattr(component, 'api_key', None))}", + flush=True, + ) if request.is_async: result = await asyncio.wait_for(method(), timeout=request.timeout) else: # Run sync method in thread pool - result = await asyncio.wait_for( - asyncio.to_thread(method), timeout=request.timeout - ) - + result = await asyncio.wait_for(asyncio.to_thread(method), timeout=request.timeout) + # DEBUG: Log result after to_toolkit - if request.method_name == "to_toolkit" and request.component_state.component_class == "AgentQL": + if ( + request.method_name == "to_toolkit" + and request.component_state.component_class == "AgentQL" + ): print(f"[EXECUTOR] 🎯 to_toolkit result type: {type(result)}", flush=True) if isinstance(result, list) and len(result) > 0: first_tool = result[0] print(f"[EXECUTOR] 🎯 First tool type: {type(first_tool)}", flush=True) if hasattr(first_tool, "metadata"): - print(f"[EXECUTOR] 🎯 First tool metadata keys: {list(first_tool.metadata.keys()) if first_tool.metadata else 'NONE'}", flush=True) + print( + f"[EXECUTOR] 🎯 First tool metadata keys: " + f"{list(first_tool.metadata.keys()) if first_tool.metadata else 'NONE'}", + flush=True, + ) if first_tool.metadata and "_component_state" in first_tool.metadata: comp_state = first_tool.metadata["_component_state"] if isinstance(comp_state, dict) and "parameters" in comp_state: params = comp_state["parameters"] - api_key_val = params.get("api_key") if isinstance(params, dict) else None - print(f"[EXECUTOR] 🎯 First tool _component_state['parameters']['api_key']: {repr(api_key_val)}", flush=True) + api_key_val = ( + params.get("api_key") if isinstance(params, dict) else None + ) + print( + "[EXECUTOR] 🎯 First tool " + "_component_state['parameters']['api_key']: " + f"{repr(api_key_val)}", + flush=True, + ) else: - print(f"[EXECUTOR] ⚠️ First tool has NO _component_state in metadata!", flush=True) + print( + "[EXECUTOR] ⚠️ First tool has NO _component_state in metadata!", + flush=True, + ) execution_time = time.time() - start_time @@ -866,17 +1015,21 @@ async def execute_component(request: ExecutionRequest) -> ExecutionResponse: type(result).__name__, result_preview, ) - + # Publish result to NATS stream if topic is provided if request.component_state.stream_topic: topic = request.component_state.stream_topic - logger.info(f"[NATS] Attempting to publish to topic: {topic} with message_id: {message_id}") - print(f"[NATS] Attempting to publish to topic: {topic} with message_id: {message_id}") + logger.info( + f"[NATS] Attempting to publish to topic: {topic} " f"with message_id: {message_id}" + ) + print( + f"[NATS] Attempting to publish to topic: {topic} " f"with message_id: {message_id}" + ) try: nats_client = await get_nats_client() if nats_client: - logger.info(f"[NATS] NATS client obtained, preparing publish data...") - print(f"[NATS] NATS client obtained, preparing publish data...") + logger.info("[NATS] NATS client obtained, preparing publish data...") + print("[NATS] NATS client obtained, preparing publish data...") # Publish result to NATS with message ID from backend publish_data = { "message_id": message_id, # Use message_id from backend request @@ -886,21 +1039,41 @@ async def execute_component(request: ExecutionRequest) -> ExecutionResponse: "result_type": type(result).__name__, "execution_time": execution_time, } - logger.info(f"[NATS] Publishing to topic: {topic}, message_id: {message_id}, data keys: {list(publish_data.keys())}") - print(f"[NATS] Publishing to topic: {topic}, message_id: {message_id}, data keys: {list(publish_data.keys())}") - # Use the topic directly (already in format: droq.local.public.userid.workflowid.component.out) + logger.info( + f"[NATS] Publishing to topic: {topic}, message_id: {message_id}, " + f"data keys: {list(publish_data.keys())}" + ) + print( + f"[NATS] Publishing to topic: {topic}, message_id: {message_id}, " + f"data keys: {list(publish_data.keys())}" + ) + # Use the topic directly (already in format: + # droq.local.public.userid.workflowid.component.out) await nats_client.publish(topic, publish_data) - logger.info(f"[NATS] ✅ Successfully published result to NATS topic: {topic} with message_id: {message_id}") - print(f"[NATS] ✅ Successfully published result to NATS topic: {topic} with message_id: {message_id}") + logger.info( + f"[NATS] ✅ Successfully published result to NATS topic: {topic} " + f"with message_id: {message_id}" + ) + print( + f"[NATS] ✅ Successfully published result to NATS topic: {topic} " + f"with message_id: {message_id}" + ) else: - logger.warning(f"[NATS] NATS client is None, cannot publish") - print(f"[NATS] ⚠️ NATS client is None, cannot publish") + logger.warning("[NATS] NATS client is None, cannot publish") + print("[NATS] ⚠️ NATS client is None, cannot publish") except Exception as e: # Non-critical: log but don't fail execution - logger.warning(f"[NATS] ❌ Failed to publish to NATS (non-critical): {e}", exc_info=True) + logger.warning( + f"[NATS] ❌ Failed to publish to NATS (non-critical): {e}", + exc_info=True, + ) print(f"[NATS] ❌ Failed to publish to NATS (non-critical): {e}") else: - msg = f"[NATS] ⚠️ No stream_topic provided in request, skipping NATS publish. Component: {request.component_state.component_class}, ID: {request.component_state.component_id}" + msg = ( + f"[NATS] ⚠️ No stream_topic provided in request, skipping NATS publish. " + f"Component: {request.component_state.component_class}, " + f"ID: {request.component_state.component_id}" + ) logger.info(msg) print(msg) @@ -909,10 +1082,11 @@ async def execute_component(request: ExecutionRequest) -> ExecutionResponse: success=True, result_type=type(result).__name__, execution_time=execution_time, - message_id=message_id, # Return message ID (from request or generated) so backend can match it + message_id=message_id, # Return message ID (from request or generated) + # so backend can match it ) - except asyncio.TimeoutError: + except TimeoutError: execution_time = time.time() - start_time error_msg = f"Execution timed out after {request.timeout}s" logger.error(error_msg) @@ -950,11 +1124,10 @@ async def health_check() -> dict[str, str]: async def root() -> dict[str, Any]: """Root endpoint.""" return { - "service": "Langflow Executor Node", + "service": "Langflow Tool Executor Node", "version": "0.1.0", "endpoints": { "execute": "/api/v1/execute", "health": "/health", }, } - diff --git a/src/tool_executor/main.py b/src/node/main.py similarity index 96% rename from src/tool_executor/main.py rename to src/node/main.py index b457215..545b582 100644 --- a/src/tool_executor/main.py +++ b/src/node/main.py @@ -8,7 +8,7 @@ import uvicorn -from tool_executor.api import app +from node.api import app logger = logging.getLogger(__name__) diff --git a/src/tool_executor/nats.py b/src/node/nats.py similarity index 90% rename from src/tool_executor/nats.py rename to src/node/nats.py index e07483f..04979cd 100644 --- a/src/tool_executor/nats.py +++ b/src/node/nats.py @@ -57,11 +57,14 @@ async def _ensure_stream(self) -> None: stream_info = await self.js.stream_info(self.stream_name) logger.info(f"Stream '{self.stream_name}' already exists") logger.info(f"Stream subjects: {stream_info.config.subjects}") - + # Check if 'droq.local.public.>' is in subjects, if not, update stream required_subject = "droq.local.public.>" if required_subject not in stream_info.config.subjects: - logger.warning(f"Stream '{self.stream_name}' missing required subject '{required_subject}', updating...") + logger.warning( + f"Stream '{self.stream_name}' missing required subject " + f"'{required_subject}', updating..." + ) subjects = list(stream_info.config.subjects) + [required_subject] await self.js.update_stream( StreamConfig( @@ -71,7 +74,9 @@ async def _ensure_stream(self) -> None: storage=stream_info.config.storage, ) ) - logger.info(f"Stream '{self.stream_name}' updated with subject '{required_subject}'") + logger.info( + f"Stream '{self.stream_name}' updated with subject " f"'{required_subject}'" + ) except Exception as e: # Stream doesn't exist, create it logger.info(f"Creating stream '{self.stream_name}' (error: {e})") @@ -80,13 +85,16 @@ async def _ensure_stream(self) -> None: name=self.stream_name, subjects=[ f"{self.stream_name}.>", # Backward compatibility - "droq.local.public.>", # Full topic path format + "droq.local.public.>", # Full topic path format ], retention=RetentionPolicy.WORK_QUEUE, storage=StorageType.FILE, ) ) - logger.info(f"Stream '{self.stream_name}' created with subjects: ['{self.stream_name}.>', 'droq.local.public.>']") + logger.info( + f"Stream '{self.stream_name}' created with subjects: " + f"['{self.stream_name}.>', 'droq.local.public.>']" + ) async def publish( self, @@ -116,8 +124,11 @@ async def publish( # Encode data as JSON payload = json.dumps(data).encode() payload_size = len(payload) - - logger.info(f"[NATS] Publishing to subject: {full_subject}, payload size: {payload_size} bytes") + + logger.info( + f"[NATS] Publishing to subject: {full_subject}, " + f"payload size: {payload_size} bytes" + ) # Publish with headers if provided if headers: @@ -125,7 +136,8 @@ async def publish( else: ack = await self.js.publish(full_subject, payload) - logger.info(f"[NATS] ✅ Published message to {full_subject} (seq: {ack.seq if hasattr(ack, 'seq') else 'N/A'})") + seq_info = ack.seq if hasattr(ack, "seq") else "N/A" + logger.info(f"[NATS] ✅ Published message to {full_subject} (seq: {seq_info})") except Exception as e: logger.error(f"Failed to publish message: {e}") raise