-
-
Notifications
You must be signed in to change notification settings - Fork 767
feat: implement unified storage system with multiple backends #979
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Conversation
…gents - Add unified storage interface (BaseStorage) with async/await support - Implement 8+ storage backends: SQLite, MongoDB, PostgreSQL, Redis, DynamoDB, S3, GCS, Azure - Enhance Memory class with backward-compatible multi-storage support - Add configuration-first approach for easy provider switching - Support primary + cache storage patterns - Maintain full backward compatibility with existing code - Add graceful dependency handling for optional backends Resolves #971 🤖 Generated with [Claude Code](https://claude.ai/code) Co-Authored-By: Claude <noreply@anthropic.com>
📝 WalkthroughWalkthroughThis PR introduces a comprehensive pluggable storage architecture with multiple backend implementations (SQLite, PostgreSQL, MongoDB, Redis, DynamoDB, S3, GCS, Azure), an optional EnhancedMemory wrapper providing unified async APIs with optional caching support, and updates the Memory import system with graceful fallback handling for missing optional dependencies. Changes
Sequence DiagramsequenceDiagram
participant App as Application
participant EM as EnhancedMemory
participant Cache as Cache Backend
participant Storage as Storage Backend<br/>(SQLite/Mongo/Redis/etc)
App->>EM: store(key, data, use_cache=True)
EM->>Cache: write to cache
Cache-->>EM: cache write complete
EM->>Storage: async write(key, data)
Storage-->>EM: write result
EM-->>App: return success
App->>EM: retrieve(key, check_cache=True)
EM->>Cache: check_cache(key)
Cache-->>EM: found in cache
EM-->>App: return cached data
App->>EM: retrieve(key, check_cache=True)
EM->>Cache: check_cache(key)
Cache-->>EM: cache miss
EM->>Storage: async read(key)
Storage-->>EM: data from storage
EM->>Cache: write to cache
Cache-->>EM: cache write complete
EM-->>App: return data
App->>EM: search_unified(query)
EM->>Storage: async search(query)
Storage-->>EM: filtered results
EM-->>App: return results
Estimated code review effort🎯 4 (Complex) | ⏱️ ~60 minutes Possibly related PRs
Suggested labels
Poem
🚥 Pre-merge checks | ✅ 3✅ Passed checks (3 passed)
✏️ Tip: You can configure your own custom pre-merge checks in the settings. ✨ Finishing touches
🧪 Generate unit tests (beta)
Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out. Comment |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Summary of Changes
Hello @MervinPraison, I'm Gemini Code Assist1! I'm currently reviewing this pull request and will post my feedback shortly. In the meantime, here's a summary to help you and other reviewers quickly get up to speed!
This pull request significantly upgrades the data persistence capabilities of the praisonai-agents library by implementing a robust, unified storage system. It abstracts away the complexities of different database and cloud storage technologies behind a common asynchronous interface, offering developers a wide range of scalable and performant options for managing agent memory and data. The design ensures seamless integration and future extensibility while preserving compatibility with existing codebases.
Highlights
- Unified Storage Abstraction: I've introduced a new
BaseStorageabstract class that defines a common asynchronous interface for all storage operations (read, write, delete, search, list keys, clear, and batch operations). This provides a consistent API for interacting with various data stores. - Expanded Storage Backend Support: This PR adds comprehensive support for 8 new storage backends: MongoDB, PostgreSQL, Redis, DynamoDB, AWS S3, Google Cloud Storage, Azure Blob Storage, and an enhanced SQLite implementation. Each backend adheres to the new
BaseStorageinterface, allowing for flexible configuration. - Enhanced Memory Management: The core
Memoryclass has been refactored intoEnhancedMemory, which now leverages the new unified storage system. It supports configuring both a primary storage and an optional cache storage, enabling more sophisticated memory strategies. - Backward Compatibility: Crucially, the changes maintain full backward compatibility. The
memory/__init__.pynow intelligently importsEnhancedMemoryif the necessary storage dependencies are available, otherwise falling back to the originalMemoryimplementation. Existing synchronous memory methods are wrapped to utilize the new asynchronous storage backends. - Configuration-First Approach: The new storage system is designed with a configuration-first approach, allowing users to easily select and configure their preferred storage providers with minimal code changes on the client side.
Using Gemini Code Assist
The full guide for Gemini Code Assist can be found on our documentation page, here are some quick tips.
Invoking Gemini
You can request assistance from Gemini at any point in your pull request via creating an issue comment (i.e. comment on the pull request page) using either /gemini <command> or @gemini-code-assist <command>. Below is a summary of the supported commands.
| Feature | Command | Description |
|---|---|---|
| Code Review | /gemini review |
Performs a code review for the current pull request in its current state. |
| Pull Request Summary | /gemini summary |
Provides a summary of the current pull request in its current state. |
| Comment | @gemini-code-assist | Responds in comments when explicitly tagged, both in issue comments and review comments. |
| Help | /gemini help |
Displays a list of available commands. |
Customization
To customize Gemini Code Assist for GitHub experience, repository maintainers can create a configuration file and/or provide a custom code review style guide (such as PEP-8 for Python) by creating and adding files to a .gemini/ folder in the base of the repository. Detailed instructions can be found here.
Limitations & Feedback
Gemini Code Assist is currently in preview and may make mistakes. Please leave feedback on any instances where its feedback is incorrect or counter productive. You can react with 👍 and 👎 on @gemini-code-assist comments to provide feedback.
You can also get AI-powered code generation, chat, as well as code reviews directly in the IDE at no cost with the Gemini Code Assist IDE Extension.
Footnotes
-
Review the Privacy Notices, Generative AI Prohibited Use Policy, Terms of Service, and learn how to configure Gemini Code Assist in GitHub here. Gemini can make mistakes, so double check it and use code with caution. ↩
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Code Review
This pull request introduces a unified storage system with multiple backends, enhancing modularity and extensibility. The review focuses on improving robustness, efficiency, and maintainability, including correcting asyncio event loop management, fixing a missing import, addressing performance issues in cloud storage and Redis backends, and improving database operation robustness.
| loop = asyncio.new_event_loop() | ||
| asyncio.set_event_loop(loop) | ||
| try: | ||
| return loop.run_until_complete(self.store(key, data)) | ||
| finally: | ||
| loop.close() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Creating a new event loop with asyncio.new_event_loop() for each synchronous call is inefficient and can cause issues. It will fail if an event loop is already running in the thread, which is common in async frameworks or in Jupyter notebooks.
A safer approach for running an async function from a sync context is asyncio.run(). While it also has limitations (it can't be called when a loop is already running), it's an improvement over managing loops manually.
This issue is present in store_short_term, search_short_term, reset_short_term, and reset_all.
| loop = asyncio.new_event_loop() | |
| asyncio.set_event_loop(loop) | |
| try: | |
| return loop.run_until_complete(self.store(key, data)) | |
| finally: | |
| loop.close() | |
| return asyncio.run(self.store(key, data)) |
|
|
||
| try: | ||
| import boto3 | ||
| from boto3.dynamodb.conditions import Key, Attr |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
| async def search(self, query: Dict[str, Any]) -> List[Dict[str, Any]]: | ||
| """Search for records matching the query.""" | ||
| await self._ensure_connection() | ||
|
|
||
| loop = asyncio.get_event_loop() | ||
|
|
||
| try: | ||
| # List all objects with prefix | ||
| response = await loop.run_in_executor( | ||
| None, | ||
| self.s3_client.list_objects_v2, | ||
| {"Bucket": self.bucket, "Prefix": self.prefix} | ||
| ) | ||
|
|
||
| results = [] | ||
| limit = query.get("limit", 100) | ||
|
|
||
| for obj in response.get("Contents", []): | ||
| if len(results) >= limit: | ||
| break | ||
|
|
||
| s3_key = obj["Key"] | ||
| key = self._strip_prefix(s3_key) | ||
|
|
||
| # Read and filter object | ||
| record = await self.read(key) | ||
| if record and self._matches_query(record, query): | ||
| results.append(record) | ||
|
|
||
| # Sort by updated_at descending | ||
| results.sort(key=lambda x: x.get("updated_at", 0), reverse=True) | ||
|
|
||
| return results | ||
|
|
||
| except Exception as e: | ||
| self.logger.error(f"Failed to search: {e}") | ||
| return [] |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The search implementation for S3 is inefficient. It lists all objects under the prefix and then reads each object's full content to perform client-side filtering. For a large number of objects, this will be slow and can incur significant costs due to the high number of GET requests.
Consider using S3 Select for server-side filtering to improve performance. At a minimum, this performance limitation should be clearly documented in the docstring to warn users.
This same issue exists for GCSStorage and AzureStorage as well.
| async def clear(self) -> bool: | ||
| """Clear all records from storage.""" | ||
| await self._ensure_connection() | ||
|
|
||
| loop = asyncio.get_event_loop() | ||
|
|
||
| try: | ||
| # List all objects | ||
| response = await loop.run_in_executor( | ||
| None, | ||
| self.s3_client.list_objects_v2, | ||
| {"Bucket": self.bucket, "Prefix": self.prefix} | ||
| ) | ||
|
|
||
| # Delete all objects | ||
| for obj in response.get("Contents", []): | ||
| await loop.run_in_executor( | ||
| None, | ||
| self.s3_client.delete_object, | ||
| {"Bucket": self.bucket, "Key": obj["Key"]} | ||
| ) | ||
|
|
||
| return True | ||
|
|
||
| except Exception as e: | ||
| self.logger.error(f"Failed to clear storage: {e}") | ||
| return False |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The clear method deletes objects one by one in a loop, which is inefficient for a large number of objects. Cloud storage providers offer batch deletion APIs that are faster and more cost-effective.
For S3, you should use s3_client.delete_objects(). This same issue applies to GCSStorage (use bucket.delete_blobs()) and AzureStorage (use container_client.delete_blobs()).
async def clear(self) -> bool:
"""Clear all records from storage."""
await self._ensure_connection()
loop = asyncio.get_event_loop()
try:
# Use paginator to handle large number of objects
paginator = self.s3_client.get_paginator('list_objects_v2')
pages = paginator.paginate(Bucket=self.bucket, Prefix=self.prefix)
for page in pages:
if 'Contents' not in page:
continue
objects_to_delete = [{'Key': obj['Key']} for obj in page['Contents']]
if not objects_to_delete:
continue
await loop.run_in_executor(
None,
self.s3_client.delete_objects,
{'Bucket': self.bucket, 'Delete': {'Objects': objects_to_delete}}
)
return True
except Exception as e:
self.logger.error(f"Failed to clear storage: {e}")
return False| async def search(self, query: Dict[str, Any]) -> List[Dict[str, Any]]: | ||
| """ | ||
| Search for records matching the query. | ||
|
|
||
| Note: Redis doesn't have native search capabilities like MongoDB/PostgreSQL. | ||
| This implementation scans all keys and filters client-side, which may be slow | ||
| for large datasets. Consider using RedisSearch module for production use. | ||
| """ | ||
| await self._ensure_connection() | ||
|
|
||
| try: | ||
| # Get all keys with our prefix | ||
| pattern = f"{self.key_prefix}*" | ||
| keys = await self.redis.keys(pattern) | ||
|
|
||
| if not keys: | ||
| return [] | ||
|
|
||
| # Get all records | ||
| raw_data = await self.redis.mget(keys) | ||
| results = [] | ||
|
|
||
| # Process and filter records | ||
| for i, data in enumerate(raw_data): | ||
| if data: | ||
| try: | ||
| json_str = self._decompress_data(data) | ||
| record = json.loads(json_str) | ||
| record["id"] = self._strip_prefix(keys[i].decode()) | ||
|
|
||
| # Apply filters | ||
| if self._matches_query(record, query): | ||
| results.append(record) | ||
|
|
||
| except (json.JSONDecodeError, UnicodeDecodeError) as e: | ||
| self.logger.error(f"Failed to decode record: {e}") | ||
| continue | ||
|
|
||
| # Sort by updated_at descending | ||
| results.sort(key=lambda x: x.get("updated_at", 0), reverse=True) | ||
|
|
||
| # Apply limit | ||
| limit = query.get("limit", 100) | ||
| return results[:limit] | ||
|
|
||
| except RedisError as e: | ||
| self.logger.error(f"Failed to search: {e}") | ||
| return [] |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The search method uses the KEYS command, which is a blocking operation that can severely degrade Redis performance in production, especially with a large number of keys. It's strongly recommended to use SCAN instead, which iterates through the keyspace without blocking the server.
This same issue applies to the clear and count methods.
async def search(self, query: Dict[str, Any]) -> List[Dict[str, Any]]:
"""
Search for records matching the query.
Note: This implementation scans keys and filters client-side, which may be slow
for large datasets. Consider using RedisSearch module for production use.
"""
await self._ensure_connection()
try:
results = []
limit = query.get("limit", 100)
# Use SCAN instead of KEYS to avoid blocking the server
async for key_bytes in self.redis.scan_iter(match=f"{self.key_prefix}*"):
data = await self.redis.get(key_bytes)
if data:
try:
json_str = self._decompress_data(data)
record = json.loads(json_str)
record["id"] = self._strip_prefix(key_bytes.decode())
if self._matches_query(record, query):
results.append(record)
except (json.JSONDecodeError, UnicodeDecodeError) as e:
self.logger.error(f"Failed to decode record: {e}")
continue
results.sort(key=lambda x: x.get("updated_at", 0), reverse=True)
return results[:limit]
except RedisError as e:
self.logger.error(f"Failed to search: {e}")
return []| result = await conn.execute( | ||
| f"DELETE FROM {self.schema}.{self.table_name} WHERE id = $1", | ||
| key | ||
| ) | ||
| # Extract affected rows from result string like "DELETE 1" | ||
| affected_rows = int(result.split()[-1]) if result.split() else 0 | ||
| return affected_rows > 0 |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Parsing the status string returned by conn.execute (e.g., "DELETE 1") to determine the number of affected rows is brittle, as the format is not guaranteed to be stable across asyncpg versions.
A more robust approach is to use the RETURNING clause to confirm the deletion.
| result = await conn.execute( | |
| f"DELETE FROM {self.schema}.{self.table_name} WHERE id = $1", | |
| key | |
| ) | |
| # Extract affected rows from result string like "DELETE 1" | |
| affected_rows = int(result.split()[-1]) if result.split() else 0 | |
| return affected_rows > 0 | |
| status = await conn.fetchval( | |
| f"DELETE FROM {self.schema}.{self.table_name} WHERE id = $1 RETURNING id", | |
| key | |
| ) | |
| return status is not None |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Bug: Async Initialization Flaws in SQLiteStorage
The SQLiteStorage constructor's asynchronous initialization is flawed. The check asyncio.get_event_loop().is_running() is problematic: asyncio.get_event_loop() is deprecated and can raise RuntimeError if no loop exists, while is_running() is not a valid method on loop objects, causing an AttributeError. Additionally, the asyncio.create_task() call is neither awaited nor stored, leading to a race condition where the constructor returns before database initialization completes. This can cause subsequent operations to fail or the task to be garbage collected. The initialization should use asyncio.get_running_loop() with proper error handling and ensure the task is managed.
src/praisonai-agents/praisonaiagents/storage/sqlite_storage.py#L44-L46
| # Initialize database | |
| asyncio.create_task(self._init_db()) if asyncio.get_event_loop().is_running() else self._init_db_sync() | |
Bug: Async Calls in Sync Batch Writer
The DynamoDBStorage class incorrectly wraps batch.put_item and batch.delete_item calls with await loop.run_in_executor inside the synchronous boto3.table.batch_writer() context. This misuses the batch writer, which is designed for synchronous operations, leading to potential resource leaks, inconsistent batch behavior, and thread safety concerns.
src/praisonai-agents/praisonaiagents/storage/dynamodb_storage.py#L429-L437
PraisonAI/src/praisonai-agents/praisonaiagents/storage/dynamodb_storage.py
Lines 429 to 437 in ea5c325
| # Delete all items | |
| with self.table.batch_writer() as batch: | |
| for item in response.get("Items", []): | |
| await loop.run_in_executor( | |
| None, | |
| batch.delete_item, | |
| {"Key": {"id": item["id"]}} | |
| ) | |
src/praisonai-agents/praisonaiagents/storage/dynamodb_storage.py#L469-L474
PraisonAI/src/praisonai-agents/praisonaiagents/storage/dynamodb_storage.py
Lines 469 to 474 in ea5c325
| await loop.run_in_executor( | |
| None, | |
| batch.put_item, | |
| {"Item": item} | |
| ) |
Bug: Async/Sync Bridging Causes Event Loop Conflicts
The EnhancedMemory class contains an incorrect async/sync bridging pattern in methods like store_short_term, search_short_term, reset_short_term, and reset_all. These methods create a new event loop with asyncio.new_event_loop() and set it as the current loop with asyncio.set_event_loop(loop). This can interfere with existing event loops, cause a RuntimeError if called within an already running async context, and does not restore the original event loop, potentially breaking other async code. The pattern should be replaced with asyncio.run() or proper handling of existing event loops.
src/praisonai-agents/praisonaiagents/memory/enhanced_memory.py#L510-L517
PraisonAI/src/praisonai-agents/praisonaiagents/memory/enhanced_memory.py
Lines 510 to 517 in ea5c325
| # Use async storage | |
| loop = asyncio.new_event_loop() | |
| asyncio.set_event_loop(loop) | |
| try: | |
| return loop.run_until_complete(self.store(key, data)) | |
| finally: | |
| loop.close() | |
| else: |
Bug: S3Storage Class Fails Boto3 API Calls
The S3Storage class incorrectly calls boto3 methods via asyncio.run_in_executor. It passes a dictionary (e.g., {'Bucket': self.bucket}) as a single positional argument, but boto3 methods expect keyword arguments (e.g., Bucket=self.bucket). This error affects all boto3 API calls within the class (e.g., head_bucket, get_object, put_object, list_objects_v2), causing runtime errors.
src/praisonai-agents/praisonaiagents/storage/cloud_storage.py#L144-L149
PraisonAI/src/praisonai-agents/praisonaiagents/storage/cloud_storage.py
Lines 144 to 149 in ea5c325
| try: | |
| await loop.run_in_executor( | |
| None, | |
| self.s3_client.head_bucket, | |
| {"Bucket": self.bucket} | |
| ) |
Bug: SQL Injection via JSONB Key Interpolation
The PostgreSQLStorage.search method is vulnerable to SQL injection when searching by metadata. The PostgreSQL ->> operator requires the JSONB key to be a literal string, not a parameterized value. The current implementation attempts to parameterize the key (e.g., metadata ->> $1), which results in invalid SQL. To execute a valid query, the key must be directly interpolated into the SQL string. Since this key originates from user input (query["metadata"]), direct interpolation creates a SQL injection vulnerability.
src/praisonai-agents/praisonaiagents/storage/postgresql_storage.py#L246-L251
PraisonAI/src/praisonai-agents/praisonaiagents/storage/postgresql_storage.py
Lines 246 to 251 in ea5c325
| for key, value in query["metadata"].items(): | |
| param_count += 1 | |
| where_conditions.append(f"metadata ->> ${param_count} = ${param_count + 1}") | |
| params.extend([key, str(value)]) | |
| param_count += 1 | |
Bug: Circular Import Issue in Memory Factory
A circular import bug exists in the Memory factory function within enhanced_memory.py. When STORAGE_AVAILABLE is False, the function attempts to import Memory from .memory (i.e., memory/__init__.py), which itself imports from enhanced_memory.py, leading to a circular dependency and an ImportError at runtime.
src/praisonai-agents/praisonaiagents/memory/enhanced_memory.py#L641-L647
PraisonAI/src/praisonai-agents/praisonaiagents/memory/enhanced_memory.py
Lines 641 to 647 in ea5c325
| """ | |
| if STORAGE_AVAILABLE: | |
| return EnhancedMemory(config, verbose) | |
| else: | |
| # Import and return original Memory class | |
| from .memory import Memory as OriginalMemory | |
| return OriginalMemory(config, verbose) |
BugBot free trial expires on July 22, 2025
Learn more in the Cursor dashboard.
Was this report helpful? Give feedback by reacting with 👍 or 👎
|
@claude review this pull request and do a detailed analysis and fix this if the existing code doesn't have the solution implemented. Making sure it has backward compatibility, no existing features removed. After making those changes, again review the applied changes. Use @web to search if you dont know any information or to find the latest documentation or to find the latest version. Run the code if you think you need to run it to test it. Minimal code change to start with if required any changes. |
|
Claude encountered an error —— View job I'll analyze this and get back to you. |
ea5c325 to
f277d86
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Cursor Bugbot has reviewed your changes and found 6 potential issues.
Bugbot Autofix is OFF. To automatically fix reported issues with Cloud Agents, enable Autofix in the Cursor dashboard.
This PR is being reviewed by Cursor Bugbot
Details
You are on the Bugbot Free tier. On this plan, Bugbot will review limited PRs each billing cycle.
To receive Bugbot reviews on all of your PRs, visit the Cursor dashboard to activate Pro and start your 14-day free trial.
| None, | ||
| self.s3_client.get_object, | ||
| {"Bucket": self.bucket, "Key": s3_key} | ||
| ) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Incorrect run_in_executor usage with dictionary arguments
High Severity
The run_in_executor calls throughout S3Storage and DynamoDBStorage incorrectly pass a dictionary as a positional argument to boto3 methods. run_in_executor(executor, func, *args) passes positional args, but boto3 methods like get_object, put_object, head_bucket, etc. expect keyword arguments (e.g., Bucket=..., Key=...). Passing {"Bucket": self.bucket, "Key": s3_key} as a single positional arg causes the boto3 methods to fail with a TypeError at runtime. This affects all storage operations in both S3 and DynamoDB backends.
Additional Locations (2)
| # Create table | ||
| self.table = await loop.run_in_executor( | ||
| None, self.dynamodb.create_table, **table_kwargs | ||
| ) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Invalid kwargs unpacking in run_in_executor call
High Severity
The run_in_executor call uses **table_kwargs syntax which is invalid. run_in_executor only accepts positional arguments after the function, not keyword arguments. The **table_kwargs will be unpacked as arguments to run_in_executor itself (which doesn't accept these kwargs), rather than being passed to create_table. This will raise a TypeError when creating a new DynamoDB table.
| elif provider_lower == "gcs": | ||
| return GCSStorage(config) | ||
| elif provider_lower == "azure": | ||
| return AzureStorage(config) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Storage class instantiation fails when dependency unavailable
High Severity
When a user configures a storage provider (e.g., "mongodb", "postgresql") but the required dependency isn't installed, the corresponding storage class variable will be None (from the storage module's fallback). Calling MongoDBStorage(config) when MongoDBStorage is None raises TypeError: 'NoneType' object is not callable, giving users no indication of the actual problem (missing dependency). The code lacks checks to verify the storage class is available before instantiation.
| if self.project: | ||
| client_kwargs["project"] = self.project | ||
| if self.credentials_path: | ||
| client_kwargs["credentials"] = self.credentials_path |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
GCS credentials parameter expects object not path
Medium Severity
The GCSStorage._ensure_connection method passes self.credentials_path (a string file path) directly to the gcs.Client constructor's credentials parameter. The GCS Client expects a google.auth.credentials.Credentials object, not a string path. Using a path string will cause the client to misinterpret the credentials and fail. The path needs to be loaded using service_account.Credentials.from_service_account_file() first.
| blob.upload_from_string, | ||
| content, | ||
| content_type="application/json" | ||
| ) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
GCS upload_from_string keyword arg fails in run_in_executor
High Severity
The run_in_executor call passes content_type="application/json" as a keyword argument, but run_in_executor only accepts positional arguments for the target function (after executor and func). The content_type keyword will be interpreted as an argument to run_in_executor itself, causing a TypeError. This affects all GCS write operations.
| os.makedirs(os.path.dirname(self.db_path) or ".", exist_ok=True) | ||
|
|
||
| # Initialize database | ||
| asyncio.create_task(self._init_db()) if asyncio.get_event_loop().is_running() else self._init_db_sync() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
SQLiteStorage init crashes on Python 3.10+ without loop
High Severity
The SQLiteStorage.__init__ calls asyncio.get_event_loop().is_running() to decide between sync and async initialization. In Python 3.10+, calling get_event_loop() when no event loop exists emits a DeprecationWarning, and in Python 3.12+ this raises a RuntimeError. Since SQLiteStorage is typically instantiated synchronously from EnhancedMemory.__init__, this will crash on Python 3.12+ or cause warnings on earlier versions.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 14
🤖 Fix all issues with AI agents
In `@src/praisonai-agents/praisonaiagents/memory/enhanced_memory.py`:
- Around line 535-541: Update the docstrings for store_long_term and
search_long_term to explicitly state they are aliases that delegate to
store_short_term and search_short_term (i.e., long-term is not differentiated
from short-term in this implementation), and add a short note for users about
this behavior and potential future differentiation so callers aren’t surprised;
locate the methods named store_long_term and search_long_term and modify their
triple-quoted docstrings accordingly.
- Around line 157-178: The _create_storage_backend function currently
instantiates backend classes directly which can be None when optional backends
are missing; update each branch (e.g., SQLiteStorage, MongoDBStorage,
PostgreSQLStorage, RedisStorage, DynamoDBStorage, S3Storage, GCSStorage,
AzureStorage) to first check whether the class reference is not None and if it
is None raise a clear ImportError or ValueError (e.g., "Backend X is unavailable
— install optional dependency Y") otherwise instantiate and return the backend
with config; keep the existing provider_lower checks and the final
unsupported-provider ValueError.
- Around line 566-572: The search method currently accepts agent_id and run_id
but never uses them; either implement filtering or document they're
placeholders. To implement, propagate agent_id and run_id into the underlying
calls (pass them from search into search_user_memory and search_short_term) and
update those functions (search_user_memory, search_short_term) to accept and
apply agent_id/run_id filters in their query logic; ensure tests and any
DB/query builders respect these new filters. Alternatively, if not supporting
filtering yet, remove agent_id/run_id from search signature or explicitly state
in the search docstring that agent_id and run_id are not supported and
optionally emit a debug log/warning when they are provided.
- Around line 458-481: In _legacy_search: sanitize and escape SQL LIKE wildcards
in the incoming text_query from query before building the parameterized pattern,
replacing '%' and '_' (and optionally the escape character) with escaped
versions, then use a single parameterized pattern (e.g., the escaped text
wrapped with '%' on both sides) in the execute call (still using
query.get("limit") as the second parameter) and include an ESCAPE clause if your
SQL requires it; reference _legacy_search, short_db, text_query, and the execute
call to locate and update the code so the wildcard characters cannot be
injected.
- Around line 500-518: The current store_short_term implementation repeatedly
creates and closes event loops (asyncio.new_event_loop()), which can leak
resources and break in environments with an existing loop; replace that block
with a safe sync-to-async call: call the coroutine via
asyncio.run(self.store(key, data)) and catch RuntimeError for "event loop is
running"—in that case obtain the running loop with asyncio.get_running_loop()
and submit the coroutine via asyncio.run_coroutine_threadsafe(self.store(key,
data), loop) (or otherwise schedule it on the running loop) so you avoid
creating new loops; keep fallback to _legacy_store when primary_storage is False
and preserve the same key/data handling.
In `@src/praisonai-agents/praisonaiagents/storage/cloud_storage.py`:
- Around line 145-149: The run_in_executor calls are passing dicts as a single
positional arg to boto3 methods (e.g. self.s3_client.head_bucket) which expects
keyword args; replace those calls by binding keyword args using
functools.partial (or a small lambda) so the boto3 methods receive proper
kwargs. Update _verify_bucket (self.s3_client.head_bucket), read
(self.s3_client.get_object), write (self.s3_client.put_object / upload), delete
(self.s3_client.delete_object), search (list/scan calls), list_keys
(self.s3_client.list_objects_v2), clear (delete_objects / list + delete), exists
(head_object) and count to use functools.partial(self.s3_client.method,
Keyword=Value, ...) or an equivalent lambda for each run_in_executor invocation.
- Around line 479-482: The code passes self.credentials_path (a file path) into
gcs.Client via client_kwargs, but gcs.Client expects a credentials object;
change the logic around self.client creation so that when self.credentials_path
is set you call gcs.Client.from_service_account_json(self.credentials_path,
**client_kwargs) to load credentials from the JSON file, otherwise call
gcs.Client(**client_kwargs). Update the creation site referencing self.client,
client_kwargs, self.credentials_path, and gcs.Client to follow this conditional
flow.
In `@src/praisonai-agents/praisonaiagents/storage/dynamodb_storage.py`:
- Around line 324-328: The variable expression_values is initialized but never
used in the try block (alongside filter_expression), so remove the unused
expression_values initialization to clean up leftover code; if the intention was
to build an ExpressionAttributeValues mapping for DynamoDB, instead implement
and use it where you build the filter (e.g., populate and pass expression_values
into the query/scan call), otherwise delete the expression_values variable and
any related dead code in the try block to avoid unused-variable warnings.
- Around line 444-483: The batch_write method mixes the synchronous DynamoDB
context manager self.table.batch_writer() with asynchronous run_in_executor
calls, causing unsafe sync/async interplay; fix by collecting prepared items
inside batch_write (use self._prepare_item and set id/created_at/updated_at/ttl)
into a local list, then either (a) perform the entire batch_writer context and
call batch.put_item synchronously for each prepared item inside run_in_executor
(i.e., wrap a function that opens self.table.batch_writer() and loops
batch.put_item(items) and run that function in the executor), or (b) avoid
run_in_executor entirely and use synchronous code inside the async method by
executing the whole batching block in loop.run_in_executor; ensure you reference
batch_write, self.table.batch_writer, batch.put_item, self._prepare_item and
preserve results mapping per record and error handling.
- Around line 429-436: The code uses the synchronous context manager
self.table.batch_writer() while awaiting loop.run_in_executor for each
batch.delete_item, which can let the context manager exit before delete tasks
complete; fix by performing the whole batch_writer block synchronously inside
the executor (or perform deletes synchronously inside the context). Concretely,
move the batch_writer loop into a synchronous helper (e.g.,
_sync_delete_items(items)) that calls batch.delete_item(...) for each item using
the same self.table.batch_writer(), then call loop.run_in_executor(None,
self._sync_delete_items, response.get("Items", [])) or simply call
batch.delete_item(...) directly without await inside the existing context so the
__exit__ flushes after deletes finish; reference symbols:
self.table.batch_writer, batch.delete_item, loop.run_in_executor.
In `@src/praisonai-agents/praisonaiagents/storage/mongodb_storage.py`:
- Around line 66-92: The lazy init in _ensure_connection is racy: add an
asyncio.Lock (e.g., self._init_lock) created in the class __init__ to serialize
initialization, then wrap the init block in "async with self._init_lock:" and
re-check self._initialized inside the lock before creating AsyncIOMotorClient,
running await self.client.admin.command('ping'), setting
self.database/collection, calling await self._create_indexes(), and setting
self._initialized; keep existing exception handling and logger calls
(self.logger) intact so only one task performs initialization and others return
once _initialized is True.
- Around line 218-243: The list_keys method is vulnerable to regex injection
because the user-provided prefix is interpolated directly into a regex; update
the list_keys implementation (function name list_keys in mongodb_storage.py) to
escape regex metacharacters before building the query (e.g., use Python's
re.escape on the prefix) and then use the escaped value in the {"$regex":
f"^{escaped_prefix}"} filter; add the necessary import (re) if missing and keep
the rest of the query/limit/sorting logic unchanged.
In `@src/praisonai-agents/praisonaiagents/storage/redis_storage.py`:
- Around line 201-203: The except block currently catches a non-existent
json.JSONEncodeError; replace that with concrete serialization/runtime
exceptions thrown by json.dumps (e.g., TypeError and ValueError) so the handler
becomes except (RedisError, TypeError, ValueError) as e; make the same change
for the similar except block around lines 371-374 to ensure both write/serialize
failure paths log and return False correctly.
In `@src/praisonai-agents/praisonaiagents/storage/sqlite_storage.py`:
- Around line 44-45: The __init__ currently uses
asyncio.get_event_loop().is_running() and asyncio.create_task(self._init_db())
which can raise DeprecationWarning and can leave the task
unreferenced/garbage-collected; change this by removing the create_task call
from __init__ and provide an explicit async factory or initializer (e.g., an
async classmethod like async_create or an async_init method) that awaits
self._init_db(), keep the existing synchronous fallback _init_db_sync for sync
contexts, and update callers to use the async factory when they need async
initialization; reference the constructors and the methods _init_db and
_init_db_sync when making this change.
🧹 Nitpick comments (8)
src/praisonai-agents/praisonaiagents/storage/sqlite_storage.py (1)
194-198: Metadata search using LIKE pattern is fragile.The JSON text search pattern
'%"{key}": "{value}"%'assumes specific JSON formatting and won't match numeric values, booleans, or differently formatted JSON. Consider using SQLite's JSON functions if available (SQLite 3.38+).📝 Note for consideration
For SQLite 3.38+, you could use:
# Using json_extract for more reliable matching where_conditions.append("json_extract(metadata, ?) = ?") params.append(f'$.{key}') params.append(value)This is optional since the current approach works for string values and maintains broader SQLite version compatibility.
src/praisonai-agents/praisonaiagents/storage/postgresql_storage.py (1)
94-113: Consider validating schema and table names.While static analysis SQL injection warnings are false positives when config comes from trusted sources, if these values could ever come from user input, they should be validated against a whitelist pattern (e.g., alphanumeric with underscores only).
🛡️ Optional validation example
import re def _validate_identifier(self, name: str, identifier_type: str) -> str: """Validate SQL identifier to prevent injection.""" if not re.match(r'^[a-zA-Z_][a-zA-Z0-9_]*$', name): raise ValueError(f"Invalid {identifier_type}: {name}") return nameThis is optional if config always comes from trusted sources like environment variables or hardcoded values.
src/praisonai-agents/praisonaiagents/storage/cloud_storage.py (1)
286-311: Duplicated_matches_querymethod across three classes.The
_matches_querymethod is identical inS3Storage,GCSStorage, andAzureStorage. Consider extracting this to theBaseStorageclass to reduce duplication.♻️ Suggested refactor
Add to
BaseStorageinbase.py:def _matches_query(self, record: Dict[str, Any], query: Dict[str, Any]) -> bool: """Check if a record matches the search query.""" # Text search in content if "text" in query: content = str(record.get("content", "")).lower() search_text = query["text"].lower() if search_text not in content: return False # Metadata search if "metadata" in query: record_metadata = record.get("metadata", {}) for key, value in query["metadata"].items(): if record_metadata.get(key) != value: return False # Time range filters if "created_after" in query: if record.get("created_at", 0) < query["created_after"]: return False if "created_before" in query: if record.get("created_at", float('inf')) > query["created_before"]: return False return TrueThen remove the duplicated methods from
S3Storage,GCSStorage, andAzureStorage.Also applies to: 604-629, 898-923
src/praisonai-agents/praisonaiagents/memory/__init__.py (1)
23-23: Consider exportingENHANCED_AVAILABLEfor feature detection.The
ENHANCED_AVAILABLEflag could be useful for consumers to detect which implementation is active. Consider adding it to__all__.📝 Optional enhancement
-__all__ = ["Memory"] +__all__ = ["Memory", "ENHANCED_AVAILABLE"]This allows consumers to check availability with:
from praisonaiagents.memory import Memory, ENHANCED_AVAILABLE if ENHANCED_AVAILABLE: # Use enhanced featuressrc/praisonai-agents/praisonaiagents/storage/mongodb_storage.py (2)
286-298: Unused variableresultfrom bulk_write.The
resultfrombulk_writeis assigned but never used. While this works, consider either using it to provide more accurate per-key success tracking or removing the assignment.♻️ Proposed fix
# Execute bulk operation if operations: - result = await self.collection.bulk_write(operations) - - # Mark all as successful if bulk operation succeeded - for key in records.keys(): - results[key] = True + await self.collection.bulk_write(operations) + # Mark all as successful if bulk operation completed without exception + results = {key: True for key in records.keys()}
341-345: Missingasynckeyword onclosemethod.The
closemethod is synchronous, but other storage backends may expect it to be async for consistency. The motor client'sclose()is synchronous, so this works, but consider addingasyncfor interface consistency.src/praisonai-agents/praisonaiagents/__init__.py (1)
134-143: Consider exposing availability flags in__all__.The storage classes in
__all__may beNoneif dependencies aren't installed. Consider also exporting availability flags (e.g.,_storage_available) or documenting this behavior so users can check before using these classes.src/praisonai-agents/praisonaiagents/storage/__init__.py (1)
54-64: Consider exporting availability flags.The
__all__exports all storage classes but not the availability flags (MONGODB_AVAILABLE, etc.). Exporting these would help consumers check backend availability without attempting imports.♻️ Proposed enhancement
__all__ = [ + # Availability flags + "MONGODB_AVAILABLE", + "POSTGRESQL_AVAILABLE", + "REDIS_AVAILABLE", + "DYNAMODB_AVAILABLE", + "CLOUD_AVAILABLE", + # Storage classes "BaseStorage", "SQLiteStorage",
| def _create_storage_backend(self, provider: str, config: Dict[str, Any]) -> BaseStorage: | ||
| """Create a storage backend instance.""" | ||
| provider_lower = provider.lower() | ||
|
|
||
| if provider_lower == "sqlite": | ||
| return SQLiteStorage(config) | ||
| elif provider_lower == "mongodb": | ||
| return MongoDBStorage(config) | ||
| elif provider_lower == "postgresql": | ||
| return PostgreSQLStorage(config) | ||
| elif provider_lower == "redis": | ||
| return RedisStorage(config) | ||
| elif provider_lower == "dynamodb": | ||
| return DynamoDBStorage(config) | ||
| elif provider_lower == "s3": | ||
| return S3Storage(config) | ||
| elif provider_lower == "gcs": | ||
| return GCSStorage(config) | ||
| elif provider_lower == "azure": | ||
| return AzureStorage(config) | ||
| else: | ||
| raise ValueError(f"Unsupported storage provider: {provider}") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Missing None checks before instantiating optional backends.
When storage backends aren't available, the classes are None. This code will raise TypeError instead of a helpful error when trying to instantiate an unavailable backend.
🐛 Proposed fix to add availability checks
def _create_storage_backend(self, provider: str, config: Dict[str, Any]) -> BaseStorage:
"""Create a storage backend instance."""
provider_lower = provider.lower()
if provider_lower == "sqlite":
return SQLiteStorage(config)
elif provider_lower == "mongodb":
+ if MongoDBStorage is None:
+ raise ImportError("MongoDB storage requires motor and pymongo. Install with: pip install motor pymongo")
return MongoDBStorage(config)
elif provider_lower == "postgresql":
+ if PostgreSQLStorage is None:
+ raise ImportError("PostgreSQL storage requires asyncpg. Install with: pip install asyncpg")
return PostgreSQLStorage(config)
elif provider_lower == "redis":
+ if RedisStorage is None:
+ raise ImportError("Redis storage requires redis. Install with: pip install redis[aio]")
return RedisStorage(config)
elif provider_lower == "dynamodb":
+ if DynamoDBStorage is None:
+ raise ImportError("DynamoDB storage requires boto3. Install with: pip install boto3")
return DynamoDBStorage(config)
elif provider_lower == "s3":
+ if S3Storage is None:
+ raise ImportError("S3 storage requires boto3. Install with: pip install boto3")
return S3Storage(config)
elif provider_lower == "gcs":
+ if GCSStorage is None:
+ raise ImportError("GCS storage requires google-cloud-storage. Install with: pip install google-cloud-storage")
return GCSStorage(config)
elif provider_lower == "azure":
+ if AzureStorage is None:
+ raise ImportError("Azure storage requires azure-storage-blob. Install with: pip install azure-storage-blob")
return AzureStorage(config)
else:
raise ValueError(f"Unsupported storage provider: {provider}")📝 Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.
| def _create_storage_backend(self, provider: str, config: Dict[str, Any]) -> BaseStorage: | |
| """Create a storage backend instance.""" | |
| provider_lower = provider.lower() | |
| if provider_lower == "sqlite": | |
| return SQLiteStorage(config) | |
| elif provider_lower == "mongodb": | |
| return MongoDBStorage(config) | |
| elif provider_lower == "postgresql": | |
| return PostgreSQLStorage(config) | |
| elif provider_lower == "redis": | |
| return RedisStorage(config) | |
| elif provider_lower == "dynamodb": | |
| return DynamoDBStorage(config) | |
| elif provider_lower == "s3": | |
| return S3Storage(config) | |
| elif provider_lower == "gcs": | |
| return GCSStorage(config) | |
| elif provider_lower == "azure": | |
| return AzureStorage(config) | |
| else: | |
| raise ValueError(f"Unsupported storage provider: {provider}") | |
| def _create_storage_backend(self, provider: str, config: Dict[str, Any]) -> BaseStorage: | |
| """Create a storage backend instance.""" | |
| provider_lower = provider.lower() | |
| if provider_lower == "sqlite": | |
| return SQLiteStorage(config) | |
| elif provider_lower == "mongodb": | |
| if MongoDBStorage is None: | |
| raise ImportError("MongoDB storage requires motor and pymongo. Install with: pip install motor pymongo") | |
| return MongoDBStorage(config) | |
| elif provider_lower == "postgresql": | |
| if PostgreSQLStorage is None: | |
| raise ImportError("PostgreSQL storage requires asyncpg. Install with: pip install asyncpg") | |
| return PostgreSQLStorage(config) | |
| elif provider_lower == "redis": | |
| if RedisStorage is None: | |
| raise ImportError("Redis storage requires redis. Install with: pip install redis[aio]") | |
| return RedisStorage(config) | |
| elif provider_lower == "dynamodb": | |
| if DynamoDBStorage is None: | |
| raise ImportError("DynamoDB storage requires boto3. Install with: pip install boto3") | |
| return DynamoDBStorage(config) | |
| elif provider_lower == "s3": | |
| if S3Storage is None: | |
| raise ImportError("S3 storage requires boto3. Install with: pip install boto3") | |
| return S3Storage(config) | |
| elif provider_lower == "gcs": | |
| if GCSStorage is None: | |
| raise ImportError("GCS storage requires google-cloud-storage. Install with: pip install google-cloud-storage") | |
| return GCSStorage(config) | |
| elif provider_lower == "azure": | |
| if AzureStorage is None: | |
| raise ImportError("Azure storage requires azure-storage-blob. Install with: pip install azure-storage-blob") | |
| return AzureStorage(config) | |
| else: | |
| raise ValueError(f"Unsupported storage provider: {provider}") |
🧰 Tools
🪛 Ruff (0.14.14)
[warning] 178-178: Avoid specifying long messages outside the exception class
(TRY003)
🤖 Prompt for AI Agents
In `@src/praisonai-agents/praisonaiagents/memory/enhanced_memory.py` around lines
157 - 178, The _create_storage_backend function currently instantiates backend
classes directly which can be None when optional backends are missing; update
each branch (e.g., SQLiteStorage, MongoDBStorage, PostgreSQLStorage,
RedisStorage, DynamoDBStorage, S3Storage, GCSStorage, AzureStorage) to first
check whether the class reference is not None and if it is None raise a clear
ImportError or ValueError (e.g., "Backend X is unavailable — install optional
dependency Y") otherwise instantiate and return the backend with config; keep
the existing provider_lower checks and the final unsupported-provider
ValueError.
| def _legacy_search(self, query: Dict[str, Any]) -> List[Dict[str, Any]]: | ||
| """Legacy search implementation for backward compatibility.""" | ||
| try: | ||
| if hasattr(self, 'short_db'): | ||
| conn = sqlite3.connect(self.short_db) | ||
| text_query = query.get("text", "") | ||
| rows = conn.execute( | ||
| "SELECT id, content, meta, created_at FROM short_mem WHERE content LIKE ? LIMIT ?", | ||
| (f"%{text_query}%", query.get("limit", 100)) | ||
| ).fetchall() | ||
| conn.close() | ||
|
|
||
| results = [] | ||
| for row in rows: | ||
| results.append({ | ||
| "id": row[0], | ||
| "content": row[1], | ||
| "metadata": json.loads(row[2] or "{}"), | ||
| "created_at": row[3] | ||
| }) | ||
| return results | ||
| except Exception as e: | ||
| logger.error(f"Legacy search failed: {e}") | ||
| return [] |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
SQL injection vulnerability in legacy search.
The text_query is directly interpolated into the SQL LIKE clause without proper parameterization of the wildcard pattern itself. While SQLite parameterization is used, the % wildcards concatenated with user input could enable injection of additional wildcards.
🛡️ Proposed fix - escape SQL LIKE wildcards
def _legacy_search(self, query: Dict[str, Any]) -> List[Dict[str, Any]]:
"""Legacy search implementation for backward compatibility."""
try:
if hasattr(self, 'short_db'):
conn = sqlite3.connect(self.short_db)
text_query = query.get("text", "")
+ # Escape SQL LIKE wildcards in user input
+ text_query = text_query.replace("%", "\\%").replace("_", "\\_")
rows = conn.execute(
- "SELECT id, content, meta, created_at FROM short_mem WHERE content LIKE ? LIMIT ?",
+ "SELECT id, content, meta, created_at FROM short_mem WHERE content LIKE ? ESCAPE '\\' LIMIT ?",
(f"%{text_query}%", query.get("limit", 100))
).fetchall()🧰 Tools
🪛 Ruff (0.14.14)
[warning] 479-479: Do not catch blind exception: Exception
(BLE001)
[warning] 480-480: Use logging.exception instead of logging.error
Replace with exception
(TRY400)
🤖 Prompt for AI Agents
In `@src/praisonai-agents/praisonaiagents/memory/enhanced_memory.py` around lines
458 - 481, In _legacy_search: sanitize and escape SQL LIKE wildcards in the
incoming text_query from query before building the parameterized pattern,
replacing '%' and '_' (and optionally the escape character) with escaped
versions, then use a single parameterized pattern (e.g., the escaped text
wrapped with '%' on both sides) in the execute call (still using
query.get("limit") as the second parameter) and include an ESCAPE clause if your
SQL requires it; reference _legacy_search, short_db, text_query, and the execute
call to locate and update the code so the wildcard characters cannot be
injected.
| def store_short_term(self, text: str, metadata: Dict[str, Any] = None, **kwargs): | ||
| """Store in short-term memory (legacy compatibility).""" | ||
| key = str(time.time_ns()) | ||
| data = { | ||
| "content": text, | ||
| "metadata": metadata or {}, | ||
| "created_at": time.time() | ||
| } | ||
|
|
||
| if self.primary_storage: | ||
| # Use async storage | ||
| loop = asyncio.new_event_loop() | ||
| asyncio.set_event_loop(loop) | ||
| try: | ||
| return loop.run_until_complete(self.store(key, data)) | ||
| finally: | ||
| loop.close() | ||
| else: | ||
| return self._legacy_store(key, data) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Creating new event loops repeatedly can cause resource leaks and issues.
The pattern asyncio.new_event_loop() / loop.close() in every sync wrapper call is inefficient and can cause issues in environments that already have an event loop (e.g., Jupyter notebooks, web frameworks). Consider using asyncio.get_event_loop() with fallback or caching the loop.
🔧 Proposed fix using asyncio.run() or loop caching
+ def _run_async(self, coro):
+ """Run an async coroutine from sync context."""
+ try:
+ loop = asyncio.get_running_loop()
+ except RuntimeError:
+ # No running loop - safe to use asyncio.run()
+ return asyncio.run(coro)
+ else:
+ # Already in async context - use nest_asyncio or create task
+ import nest_asyncio
+ nest_asyncio.apply()
+ return loop.run_until_complete(coro)
+
def store_short_term(self, text: str, metadata: Dict[str, Any] = None, **kwargs):
"""Store in short-term memory (legacy compatibility)."""
key = str(time.time_ns())
data = {
"content": text,
"metadata": metadata or {},
"created_at": time.time()
}
if self.primary_storage:
- # Use async storage
- loop = asyncio.new_event_loop()
- asyncio.set_event_loop(loop)
- try:
- return loop.run_until_complete(self.store(key, data))
- finally:
- loop.close()
+ return self._run_async(self.store(key, data))
else:
return self._legacy_store(key, data)Alternatively, simpler fix using asyncio.run() (Python 3.7+):
if self.primary_storage:
- loop = asyncio.new_event_loop()
- asyncio.set_event_loop(loop)
- try:
- return loop.run_until_complete(self.store(key, data))
- finally:
- loop.close()
+ return asyncio.run(self.store(key, data))🧰 Tools
🪛 Ruff (0.14.14)
[warning] 500-500: PEP 484 prohibits implicit Optional
Convert to T | None
(RUF013)
[warning] 500-500: Unused method argument: kwargs
(ARG002)
🤖 Prompt for AI Agents
In `@src/praisonai-agents/praisonaiagents/memory/enhanced_memory.py` around lines
500 - 518, The current store_short_term implementation repeatedly creates and
closes event loops (asyncio.new_event_loop()), which can leak resources and
break in environments with an existing loop; replace that block with a safe
sync-to-async call: call the coroutine via asyncio.run(self.store(key, data))
and catch RuntimeError for "event loop is running"—in that case obtain the
running loop with asyncio.get_running_loop() and submit the coroutine via
asyncio.run_coroutine_threadsafe(self.store(key, data), loop) (or otherwise
schedule it on the running loop) so you avoid creating new loops; keep fallback
to _legacy_store when primary_storage is False and preserve the same key/data
handling.
| def store_long_term(self, text: str, metadata: Dict[str, Any] = None, **kwargs): | ||
| """Store in long-term memory (legacy compatibility).""" | ||
| return self.store_short_term(text, metadata, **kwargs) | ||
|
|
||
| def search_long_term(self, query: str, limit: int = 5, **kwargs) -> List[Dict[str, Any]]: | ||
| """Search long-term memory (legacy compatibility).""" | ||
| return self.search_short_term(query, limit, **kwargs) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
store_long_term and search_long_term are aliases - document this behavior.
These methods simply delegate to short-term equivalents, meaning long-term and short-term memory are not differentiated. This may surprise users expecting separate storage. Consider adding a docstring note or actually differentiating them.
🧰 Tools
🪛 Ruff (0.14.14)
[warning] 535-535: PEP 484 prohibits implicit Optional
Convert to T | None
(RUF013)
🤖 Prompt for AI Agents
In `@src/praisonai-agents/praisonaiagents/memory/enhanced_memory.py` around lines
535 - 541, Update the docstrings for store_long_term and search_long_term to
explicitly state they are aliases that delegate to store_short_term and
search_short_term (i.e., long-term is not differentiated from short-term in this
implementation), and add a short note for users about this behavior and
potential future differentiation so callers aren’t surprised; locate the methods
named store_long_term and search_long_term and modify their triple-quoted
docstrings accordingly.
| def search(self, query: str, user_id: Optional[str] = None, agent_id: Optional[str] = None, | ||
| run_id: Optional[str] = None, limit: int = 5, **kwargs) -> List[Dict[str, Any]]: | ||
| """Generic search method (legacy compatibility).""" | ||
| if user_id: | ||
| return self.search_user_memory(user_id, query, limit=limit, **kwargs) | ||
| else: | ||
| return self.search_short_term(query, limit=limit, **kwargs) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Unused parameters agent_id and run_id in search method.
These parameters are accepted but never used, which could mislead users into thinking filtering by agent/run is supported.
🛡️ Options: implement or document as not-yet-supported
Option 1 - Document that these are placeholders:
def search(self, query: str, user_id: Optional[str] = None, agent_id: Optional[str] = None,
run_id: Optional[str] = None, limit: int = 5, **kwargs) -> List[Dict[str, Any]]:
- """Generic search method (legacy compatibility)."""
+ """Generic search method (legacy compatibility).
+
+ Note: agent_id and run_id parameters are accepted for API compatibility
+ but are not currently used for filtering.
+ """Option 2 - Implement filtering:
if user_id:
return self.search_user_memory(user_id, query, limit=limit, **kwargs)
+ elif agent_id:
+ results = self.search_short_term(query, limit=20)
+ return [r for r in results if r.get("metadata", {}).get("agent_id") == agent_id][:limit]
else:
return self.search_short_term(query, limit=limit, **kwargs)📝 Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.
| def search(self, query: str, user_id: Optional[str] = None, agent_id: Optional[str] = None, | |
| run_id: Optional[str] = None, limit: int = 5, **kwargs) -> List[Dict[str, Any]]: | |
| """Generic search method (legacy compatibility).""" | |
| if user_id: | |
| return self.search_user_memory(user_id, query, limit=limit, **kwargs) | |
| else: | |
| return self.search_short_term(query, limit=limit, **kwargs) | |
| def search(self, query: str, user_id: Optional[str] = None, agent_id: Optional[str] = None, | |
| run_id: Optional[str] = None, limit: int = 5, **kwargs) -> List[Dict[str, Any]]: | |
| """Generic search method (legacy compatibility). | |
| Note: agent_id and run_id parameters are accepted for API compatibility | |
| but are not currently used for filtering. | |
| """ | |
| if user_id: | |
| return self.search_user_memory(user_id, query, limit=limit, **kwargs) | |
| else: | |
| return self.search_short_term(query, limit=limit, **kwargs) |
| def search(self, query: str, user_id: Optional[str] = None, agent_id: Optional[str] = None, | |
| run_id: Optional[str] = None, limit: int = 5, **kwargs) -> List[Dict[str, Any]]: | |
| """Generic search method (legacy compatibility).""" | |
| if user_id: | |
| return self.search_user_memory(user_id, query, limit=limit, **kwargs) | |
| else: | |
| return self.search_short_term(query, limit=limit, **kwargs) | |
| def search(self, query: str, user_id: Optional[str] = None, agent_id: Optional[str] = None, | |
| run_id: Optional[str] = None, limit: int = 5, **kwargs) -> List[Dict[str, Any]]: | |
| """Generic search method (legacy compatibility).""" | |
| if user_id: | |
| return self.search_user_memory(user_id, query, limit=limit, **kwargs) | |
| elif agent_id: | |
| results = self.search_short_term(query, limit=20) | |
| return [r for r in results if r.get("metadata", {}).get("agent_id") == agent_id][:limit] | |
| else: | |
| return self.search_short_term(query, limit=limit, **kwargs) |
🧰 Tools
🪛 Ruff (0.14.14)
[warning] 566-566: Unused method argument: agent_id
(ARG002)
[warning] 567-567: Unused method argument: run_id
(ARG002)
🤖 Prompt for AI Agents
In `@src/praisonai-agents/praisonaiagents/memory/enhanced_memory.py` around lines
566 - 572, The search method currently accepts agent_id and run_id but never
uses them; either implement filtering or document they're placeholders. To
implement, propagate agent_id and run_id into the underlying calls (pass them
from search into search_user_memory and search_short_term) and update those
functions (search_user_memory, search_short_term) to accept and apply
agent_id/run_id filters in their query logic; ensure tests and any DB/query
builders respect these new filters. Alternatively, if not supporting filtering
yet, remove agent_id/run_id from search signature or explicitly state in the
search docstring that agent_id and run_id are not supported and optionally emit
a debug log/warning when they are provided.
| async def batch_write(self, records: Dict[str, Dict[str, Any]]) -> Dict[str, bool]: | ||
| """Optimized batch write for DynamoDB.""" | ||
| await self._ensure_connection() | ||
|
|
||
| loop = asyncio.get_event_loop() | ||
| results = {} | ||
|
|
||
| try: | ||
| current_time = time.time() | ||
|
|
||
| # DynamoDB batch_writer handles batching automatically | ||
| with self.table.batch_writer() as batch: | ||
| for key, data in records.items(): | ||
| item = data.copy() | ||
| item["id"] = key | ||
| item["updated_at"] = current_time | ||
|
|
||
| if "created_at" not in item: | ||
| item["created_at"] = current_time | ||
|
|
||
| if self.ttl_attribute and self.ttl_attribute not in item: | ||
| ttl_seconds = 30 * 24 * 60 * 60 | ||
| item[self.ttl_attribute] = int(time.time() + ttl_seconds) | ||
|
|
||
| item = self._prepare_item(item) | ||
|
|
||
| await loop.run_in_executor( | ||
| None, | ||
| batch.put_item, | ||
| {"Item": item} | ||
| ) | ||
|
|
||
| results[key] = True | ||
|
|
||
| except Exception as e: | ||
| self.logger.error(f"Failed batch write: {e}") | ||
| for key in records.keys(): | ||
| results[key] = False | ||
|
|
||
| return results |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Same issue in batch_write with sync context manager.
The batch_write method has the same issue as clear() - mixing synchronous batch_writer() context manager with async run_in_executor calls inside it.
🐛 Suggested fix
async def batch_write(self, records: Dict[str, Dict[str, Any]]) -> Dict[str, bool]:
"""Optimized batch write for DynamoDB."""
await self._ensure_connection()
loop = asyncio.get_event_loop()
results = {}
try:
current_time = time.time()
- # DynamoDB batch_writer handles batching automatically
- with self.table.batch_writer() as batch:
- for key, data in records.items():
- item = data.copy()
- item["id"] = key
- item["updated_at"] = current_time
-
- if "created_at" not in item:
- item["created_at"] = current_time
-
- if self.ttl_attribute and self.ttl_attribute not in item:
- ttl_seconds = 30 * 24 * 60 * 60
- item[self.ttl_attribute] = int(time.time() + ttl_seconds)
-
- item = self._prepare_item(item)
-
- await loop.run_in_executor(
- None,
- batch.put_item,
- {"Item": item}
- )
-
- results[key] = True
+ # Prepare items
+ items_to_write = []
+ for key, data in records.items():
+ item = data.copy()
+ item["id"] = key
+ item["updated_at"] = current_time
+ if "created_at" not in item:
+ item["created_at"] = current_time
+ if self.ttl_attribute and self.ttl_attribute not in item:
+ ttl_seconds = 30 * 24 * 60 * 60
+ item[self.ttl_attribute] = int(time.time() + ttl_seconds)
+ items_to_write.append((key, self._prepare_item(item)))
+
+ # Write all items synchronously within executor
+ def write_all():
+ with self.table.batch_writer() as batch:
+ for key, item in items_to_write:
+ batch.put_item(Item=item)
+
+ await loop.run_in_executor(None, write_all)
+
+ for key in records.keys():
+ results[key] = True🧰 Tools
🪛 Ruff (0.14.14)
[warning] 478-478: Do not catch blind exception: Exception
(BLE001)
[warning] 479-479: Use logging.exception instead of logging.error
Replace with exception
(TRY400)
🤖 Prompt for AI Agents
In `@src/praisonai-agents/praisonaiagents/storage/dynamodb_storage.py` around
lines 444 - 483, The batch_write method mixes the synchronous DynamoDB context
manager self.table.batch_writer() with asynchronous run_in_executor calls,
causing unsafe sync/async interplay; fix by collecting prepared items inside
batch_write (use self._prepare_item and set id/created_at/updated_at/ttl) into a
local list, then either (a) perform the entire batch_writer context and call
batch.put_item synchronously for each prepared item inside run_in_executor
(i.e., wrap a function that opens self.table.batch_writer() and loops
batch.put_item(items) and run that function in the executor), or (b) avoid
run_in_executor entirely and use synchronous code inside the async method by
executing the whole batching block in loop.run_in_executor; ensure you reference
batch_write, self.table.batch_writer, batch.put_item, self._prepare_item and
preserve results mapping per record and error handling.
| async def _ensure_connection(self): | ||
| """Ensure MongoDB connection is established.""" | ||
| if not self._initialized: | ||
| try: | ||
| self.client = AsyncIOMotorClient( | ||
| self.url, | ||
| serverSelectionTimeoutMS=self.timeout | ||
| ) | ||
|
|
||
| # Test connection | ||
| await self.client.admin.command('ping') | ||
|
|
||
| self.database = self.client[self.database_name] | ||
| self.collection = self.database[self.collection_name] | ||
|
|
||
| # Create indexes | ||
| await self._create_indexes() | ||
|
|
||
| self._initialized = True | ||
| self.logger.info(f"Connected to MongoDB: {self.database_name}.{self.collection_name}") | ||
|
|
||
| except ServerSelectionTimeoutError as e: | ||
| self.logger.error(f"Failed to connect to MongoDB: {e}") | ||
| raise | ||
| except Exception as e: | ||
| self.logger.error(f"Unexpected error connecting to MongoDB: {e}") | ||
| raise |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Consider thread-safety for lazy initialization.
The _ensure_connection method has a potential race condition if called concurrently from multiple async tasks - two tasks could both see _initialized = False and attempt to initialize simultaneously.
🔒 Proposed fix using asyncio.Lock
+ self._init_lock = asyncio.Lock()
self._initialized = False
async def _ensure_connection(self):
"""Ensure MongoDB connection is established."""
- if not self._initialized:
+ if self._initialized:
+ return
+ async with self._init_lock:
+ if self._initialized: # Double-check after acquiring lock
+ return
try:
self.client = AsyncIOMotorClient(🧰 Tools
🪛 Ruff (0.14.14)
[warning] 88-88: Use logging.exception instead of logging.error
Replace with exception
(TRY400)
[warning] 91-91: Use logging.exception instead of logging.error
Replace with exception
(TRY400)
🤖 Prompt for AI Agents
In `@src/praisonai-agents/praisonaiagents/storage/mongodb_storage.py` around lines
66 - 92, The lazy init in _ensure_connection is racy: add an asyncio.Lock (e.g.,
self._init_lock) created in the class __init__ to serialize initialization, then
wrap the init block in "async with self._init_lock:" and re-check
self._initialized inside the lock before creating AsyncIOMotorClient, running
await self.client.admin.command('ping'), setting self.database/collection,
calling await self._create_indexes(), and setting self._initialized; keep
existing exception handling and logger calls (self.logger) intact so only one
task performs initialization and others return once _initialized is True.
| async def list_keys(self, prefix: Optional[str] = None, limit: Optional[int] = None) -> List[str]: | ||
| """List keys in storage.""" | ||
| await self._ensure_connection() | ||
|
|
||
| try: | ||
| # Build query for prefix filtering | ||
| query = {} | ||
| if prefix: | ||
| query["_id"] = {"$regex": f"^{prefix}"} | ||
|
|
||
| # Get only the _id field | ||
| cursor = self.collection.find(query, {"_id": 1}) | ||
|
|
||
| if limit: | ||
| cursor = cursor.limit(limit) | ||
|
|
||
| cursor = cursor.sort("created_at", DESCENDING) | ||
|
|
||
| keys = [] | ||
| async for doc in cursor: | ||
| keys.append(doc["_id"]) | ||
|
|
||
| return keys | ||
| except Exception as e: | ||
| self.logger.error(f"Failed to list keys: {e}") | ||
| return [] |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Potential regex injection in list_keys prefix filter.
User-provided prefix is directly interpolated into a regex pattern without escaping, which could cause unexpected behavior or errors if the prefix contains regex metacharacters.
🛡️ Proposed fix to escape regex metacharacters
+import re
+
async def list_keys(self, prefix: Optional[str] = None, limit: Optional[int] = None) -> List[str]:
"""List keys in storage."""
await self._ensure_connection()
try:
# Build query for prefix filtering
query = {}
if prefix:
- query["_id"] = {"$regex": f"^{prefix}"}
+ query["_id"] = {"$regex": f"^{re.escape(prefix)}"}🧰 Tools
🪛 Ruff (0.14.14)
[warning] 240-240: Consider moving this statement to an else block
(TRY300)
[warning] 241-241: Do not catch blind exception: Exception
(BLE001)
[warning] 242-242: Use logging.exception instead of logging.error
Replace with exception
(TRY400)
🤖 Prompt for AI Agents
In `@src/praisonai-agents/praisonaiagents/storage/mongodb_storage.py` around lines
218 - 243, The list_keys method is vulnerable to regex injection because the
user-provided prefix is interpolated directly into a regex; update the list_keys
implementation (function name list_keys in mongodb_storage.py) to escape regex
metacharacters before building the query (e.g., use Python's re.escape on the
prefix) and then use the escaped value in the {"$regex": f"^{escaped_prefix}"}
filter; add the necessary import (re) if missing and keep the rest of the
query/limit/sorting logic unchanged.
| except (RedisError, json.JSONEncodeError) as e: | ||
| self.logger.error(f"Failed to write key {key}: {e}") | ||
| return False |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
json.JSONEncodeError does not exist in the standard library.
The exception type json.JSONEncodeError is not a valid Python exception. The json module only defines JSONDecodeError. Serialization errors from json.dumps() would raise TypeError or ValueError.
🐛 Proposed fix
- except (RedisError, json.JSONEncodeError) as e:
+ except (RedisError, TypeError, ValueError) as e:Also applies to: 371-374
🧰 Tools
🪛 Ruff (0.14.14)
[warning] 202-202: Use logging.exception instead of logging.error
Replace with exception
(TRY400)
🤖 Prompt for AI Agents
In `@src/praisonai-agents/praisonaiagents/storage/redis_storage.py` around lines
201 - 203, The except block currently catches a non-existent
json.JSONEncodeError; replace that with concrete serialization/runtime
exceptions thrown by json.dumps (e.g., TypeError and ValueError) so the handler
becomes except (RedisError, TypeError, ValueError) as e; make the same change
for the similar except block around lines 371-374 to ensure both write/serialize
failure paths log and return False correctly.
| # Initialize database | ||
| asyncio.create_task(self._init_db()) if asyncio.get_event_loop().is_running() else self._init_db_sync() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Potential issue with asyncio.create_task in __init__.
The condition asyncio.get_event_loop().is_running() can raise DeprecationWarning in Python 3.10+ and may not work as expected. Using asyncio.create_task in __init__ can also cause the task to be garbage collected before completion if not awaited.
🛠️ Suggested safer approach
- # Initialize database
- asyncio.create_task(self._init_db()) if asyncio.get_event_loop().is_running() else self._init_db_sync()
+ # Initialize database synchronously - async init can be called separately if needed
+ self._init_db_sync()Alternatively, consider providing an explicit async factory method for async initialization contexts.
🤖 Prompt for AI Agents
In `@src/praisonai-agents/praisonaiagents/storage/sqlite_storage.py` around lines
44 - 45, The __init__ currently uses asyncio.get_event_loop().is_running() and
asyncio.create_task(self._init_db()) which can raise DeprecationWarning and can
leave the task unreferenced/garbage-collected; change this by removing the
create_task call from __init__ and provide an explicit async factory or
initializer (e.g., an async classmethod like async_create or an async_init
method) that awaits self._init_db(), keep the existing synchronous fallback
_init_db_sync for sync contexts, and update callers to use the async factory
when they need async initialization; reference the constructors and the methods
_init_db and _init_db_sync when making this change.


Summary
✅ Unified Storage System Implementation
✅ Key Features
✅ New API Examples