Skip to content

Conversation

@MervinPraison
Copy link
Owner

@MervinPraison MervinPraison commented Jul 17, 2025

Summary

Unified Storage System Implementation

  • Created BaseStorage abstract class with async/sync support
  • Implemented 8 storage backends: MongoDB, PostgreSQL, Redis, DynamoDB, S3, GCS, Azure
  • Enhanced Memory class with provider selection API
  • Maintained full backward compatibility

Key Features

  • Minimal code changes on client side
  • Configuration-first approach
  • Multi-storage support
  • Quality-based storage decisions
  • Connection pooling and performance optimization

New API Examples

# MongoDB storage
memory 

<!-- CURSOR_SUMMARY -->
---

> [!NOTE]
> **Medium Risk**
> Touches core persistence/memory plumbing and adds multiple optional dependency paths; misconfiguration or event-loop/async wrapper behavior could cause runtime regressions across different environments.
> 
> **Overview**
> Adds a new unified storage layer (`storage` package) with an async `BaseStorage` interface and concrete backends for `SQLite`, `MongoDB`, `PostgreSQL`, `Redis`, `DynamoDB`, and cloud object stores (`S3`/`GCS`/`Azure`).
> 
> Updates `Memory` to preferentially use a new `enhanced_memory` implementation that can route reads/writes/searches/deletes through a configured primary backend plus optional cache, while preserving legacy synchronous `store_*`/`search_*` APIs via wrappers and SQLite fallbacks. The package `__init__` now exports `Memory` from `praisonaiagents.memory` and conditionally exposes storage backend classes when optional dependencies are installed.
> 
> <sup>Written by [Cursor Bugbot](https://cursor.com/dashboard?tab=bugbot) for commit f277d8613e07cee2dc3692cf9bd0712fee81ec26. This will update automatically on new commits. Configure [here](https://cursor.com/dashboard?tab=bugbot).</sup>
<!-- /CURSOR_SUMMARY -->

<!-- This is an auto-generated comment: release notes by coderabbit.ai -->

## Summary by CodeRabbit

* **New Features**
  * Added support for multiple storage backends: SQLite, MongoDB, PostgreSQL, Redis, DynamoDB, and cloud platforms (AWS S3, Google Cloud Storage, Azure).
  * Introduced EnhancedMemory system with a unified async API for flexible memory and storage operations across multiple backends.
  * Implemented optional dependency handling with graceful fallback when storage backends are not installed.

<!-- end of auto-generated comment: release notes by coderabbit.ai -->

…gents

- Add unified storage interface (BaseStorage) with async/await support
- Implement 8+ storage backends: SQLite, MongoDB, PostgreSQL, Redis, DynamoDB, S3, GCS, Azure
- Enhance Memory class with backward-compatible multi-storage support
- Add configuration-first approach for easy provider switching
- Support primary + cache storage patterns
- Maintain full backward compatibility with existing code
- Add graceful dependency handling for optional backends

Resolves #971

🤖 Generated with [Claude Code](https://claude.ai/code)

Co-Authored-By: Claude <noreply@anthropic.com>
@coderabbitai
Copy link
Contributor

coderabbitai bot commented Jul 17, 2025

📝 Walkthrough

Walkthrough

This PR introduces a comprehensive pluggable storage architecture with multiple backend implementations (SQLite, PostgreSQL, MongoDB, Redis, DynamoDB, S3, GCS, Azure), an optional EnhancedMemory wrapper providing unified async APIs with optional caching support, and updates the Memory import system with graceful fallback handling for missing optional dependencies.

Changes

Cohort / File(s) Summary
Core Package Exports
src/praisonai-agents/praisonaiagents/__init__.py
Modified Memory import path from submodule to direct import; added conditional storage backend imports with availability flags (try/except); exposed storage classes in public API (__all__).
Memory Module Integration
src/praisonai-agents/praisonaiagents/memory/__init__.py, src/praisonai-agents/praisonaiagents/memory/enhanced_memory.py
Added conditional import logic to select EnhancedMemory when available, falling back to original Memory; introduced new EnhancedMemory class with unified async store/retrieve/search_unified/delete_unified methods, legacy sync wrappers (store_short_term, search_long_term, etc.), and optional cache-backed architecture.
Storage Base Infrastructure
src/praisonai-agents/praisonaiagents/storage/__init__.py, src/praisonai-agents/praisonaiagents/storage/base.py
Created storage package with BaseStorage abstract class defining unified async interface (read, write, delete, search, list_keys, clear); added batch operations and utility methods; conditional imports for optional backends with per-backend availability flags.
Database Storage Backends
src/praisonai-agents/praisonaiagents/storage/sqlite_storage.py, src/praisonai-agents/praisonaiagents/storage/postgresql_storage.py, src/praisonai-agents/praisonaiagents/storage/mongodb_storage.py, src/praisonai-agents/praisonaiagents/storage/redis_storage.py, src/praisonai-agents/praisonaiagents/storage/dynamodb_storage.py
Implemented database-backed storage with async CRUD, batch operations, search filtering (text, metadata, time ranges), TTL support, and SDK-specific connection management; each backend handles its own initialization, error handling, and resource cleanup.
Cloud Storage Backends
src/praisonai-agents/praisonaiagents/storage/cloud_storage.py
Implemented three cloud storage providers (S3Storage, GCSStorage, AzureStorage) sharing consistent interface with key namespacing, bucket/container verification, async executor-based operations, and per-provider SDK integration (boto3, google-cloud-storage, azure-storage-blob).

Sequence Diagram

sequenceDiagram
    participant App as Application
    participant EM as EnhancedMemory
    participant Cache as Cache Backend
    participant Storage as Storage Backend<br/>(SQLite/Mongo/Redis/etc)
    
    App->>EM: store(key, data, use_cache=True)
    EM->>Cache: write to cache
    Cache-->>EM: cache write complete
    EM->>Storage: async write(key, data)
    Storage-->>EM: write result
    EM-->>App: return success
    
    App->>EM: retrieve(key, check_cache=True)
    EM->>Cache: check_cache(key)
    Cache-->>EM: found in cache
    EM-->>App: return cached data
    
    App->>EM: retrieve(key, check_cache=True)
    EM->>Cache: check_cache(key)
    Cache-->>EM: cache miss
    EM->>Storage: async read(key)
    Storage-->>EM: data from storage
    EM->>Cache: write to cache
    Cache-->>EM: cache write complete
    EM-->>App: return data
    
    App->>EM: search_unified(query)
    EM->>Storage: async search(query)
    Storage-->>EM: filtered results
    EM-->>App: return results
Loading

Estimated code review effort

🎯 4 (Complex) | ⏱️ ~60 minutes

Possibly related PRs

  • #610: Modifies Memory import paths and package-level exports in __init__.py, directly overlapping with this PR's Memory import refactoring.
  • #994: Implements MongoDB-backed storage and memory functionality with similar storage/memory component architecture.

Suggested labels

enhancement, storage, memory, async, database, cloud-storage

Poem

🐰 A cottage of backends now stands side by side,
Where memory flows async, with cache as its guide,
From Redis to Mongo, from S3 clouds above,
Storage adapts gracefully—one unified love!
With optional backends and fallbacks so bright,
This new architecture feels perfectly right! 🌿

🚥 Pre-merge checks | ✅ 3
✅ Passed checks (3 passed)
Check name Status Explanation
Description Check ✅ Passed Check skipped - CodeRabbit’s high-level summary is enabled.
Title check ✅ Passed The title accurately summarizes the main objective of the changeset: implementing a unified storage system with multiple backends across multiple new storage backend classes and an enhanced memory system.
Docstring Coverage ✅ Passed Docstring coverage is 99.42% which is sufficient. The required threshold is 80.00%.

✏️ Tip: You can configure your own custom pre-merge checks in the settings.

✨ Finishing touches
  • 📝 Generate docstrings
🧪 Generate unit tests (beta)
  • Create PR with unit tests
  • Post copyable unit tests in a comment
  • Commit unit tests in branch claude/issue-971-20250717-0015

Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out.

❤️ Share

Comment @coderabbitai help to get the list of available commands and usage tips.

Copy link
Contributor

@gemini-code-assist gemini-code-assist bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Summary of Changes

Hello @MervinPraison, I'm Gemini Code Assist1! I'm currently reviewing this pull request and will post my feedback shortly. In the meantime, here's a summary to help you and other reviewers quickly get up to speed!

This pull request significantly upgrades the data persistence capabilities of the praisonai-agents library by implementing a robust, unified storage system. It abstracts away the complexities of different database and cloud storage technologies behind a common asynchronous interface, offering developers a wide range of scalable and performant options for managing agent memory and data. The design ensures seamless integration and future extensibility while preserving compatibility with existing codebases.

Highlights

  • Unified Storage Abstraction: I've introduced a new BaseStorage abstract class that defines a common asynchronous interface for all storage operations (read, write, delete, search, list keys, clear, and batch operations). This provides a consistent API for interacting with various data stores.
  • Expanded Storage Backend Support: This PR adds comprehensive support for 8 new storage backends: MongoDB, PostgreSQL, Redis, DynamoDB, AWS S3, Google Cloud Storage, Azure Blob Storage, and an enhanced SQLite implementation. Each backend adheres to the new BaseStorage interface, allowing for flexible configuration.
  • Enhanced Memory Management: The core Memory class has been refactored into EnhancedMemory, which now leverages the new unified storage system. It supports configuring both a primary storage and an optional cache storage, enabling more sophisticated memory strategies.
  • Backward Compatibility: Crucially, the changes maintain full backward compatibility. The memory/__init__.py now intelligently imports EnhancedMemory if the necessary storage dependencies are available, otherwise falling back to the original Memory implementation. Existing synchronous memory methods are wrapped to utilize the new asynchronous storage backends.
  • Configuration-First Approach: The new storage system is designed with a configuration-first approach, allowing users to easily select and configure their preferred storage providers with minimal code changes on the client side.
Using Gemini Code Assist

The full guide for Gemini Code Assist can be found on our documentation page, here are some quick tips.

Invoking Gemini

You can request assistance from Gemini at any point in your pull request via creating an issue comment (i.e. comment on the pull request page) using either /gemini <command> or @gemini-code-assist <command>. Below is a summary of the supported commands.

Feature Command Description
Code Review /gemini review Performs a code review for the current pull request in its current state.
Pull Request Summary /gemini summary Provides a summary of the current pull request in its current state.
Comment @gemini-code-assist Responds in comments when explicitly tagged, both in issue comments and review comments.
Help /gemini help Displays a list of available commands.

Customization

To customize Gemini Code Assist for GitHub experience, repository maintainers can create a configuration file and/or provide a custom code review style guide (such as PEP-8 for Python) by creating and adding files to a .gemini/ folder in the base of the repository. Detailed instructions can be found here.

Limitations & Feedback

Gemini Code Assist is currently in preview and may make mistakes. Please leave feedback on any instances where its feedback is incorrect or counter productive. You can react with 👍 and 👎 on @gemini-code-assist comments to provide feedback.

You can also get AI-powered code generation, chat, as well as code reviews directly in the IDE at no cost with the Gemini Code Assist IDE Extension.

Footnotes

  1. Review the Privacy Notices, Generative AI Prohibited Use Policy, Terms of Service, and learn how to configure Gemini Code Assist in GitHub here. Gemini can make mistakes, so double check it and use code with caution.

Copy link
Contributor

@gemini-code-assist gemini-code-assist bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Code Review

This pull request introduces a unified storage system with multiple backends, enhancing modularity and extensibility. The review focuses on improving robustness, efficiency, and maintainability, including correcting asyncio event loop management, fixing a missing import, addressing performance issues in cloud storage and Redis backends, and improving database operation robustness.

Comment on lines +511 to +516
loop = asyncio.new_event_loop()
asyncio.set_event_loop(loop)
try:
return loop.run_until_complete(self.store(key, data))
finally:
loop.close()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

critical

Creating a new event loop with asyncio.new_event_loop() for each synchronous call is inefficient and can cause issues. It will fail if an event loop is already running in the thread, which is common in async frameworks or in Jupyter notebooks.

A safer approach for running an async function from a sync context is asyncio.run(). While it also has limitations (it can't be called when a loop is already running), it's an improvement over managing loops manually.

This issue is present in store_short_term, search_short_term, reset_short_term, and reset_all.

Suggested change
loop = asyncio.new_event_loop()
asyncio.set_event_loop(loop)
try:
return loop.run_until_complete(self.store(key, data))
finally:
loop.close()
return asyncio.run(self.store(key, data))


try:
import boto3
from boto3.dynamodb.conditions import Key, Attr
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

critical

This line will raise a NameError at runtime because TypeDeserializer is not imported. You need to add from boto3.dynamodb.types import TypeDeserializer at the top of the file to resolve this.

Comment on lines +248 to +284
async def search(self, query: Dict[str, Any]) -> List[Dict[str, Any]]:
"""Search for records matching the query."""
await self._ensure_connection()

loop = asyncio.get_event_loop()

try:
# List all objects with prefix
response = await loop.run_in_executor(
None,
self.s3_client.list_objects_v2,
{"Bucket": self.bucket, "Prefix": self.prefix}
)

results = []
limit = query.get("limit", 100)

for obj in response.get("Contents", []):
if len(results) >= limit:
break

s3_key = obj["Key"]
key = self._strip_prefix(s3_key)

# Read and filter object
record = await self.read(key)
if record and self._matches_query(record, query):
results.append(record)

# Sort by updated_at descending
results.sort(key=lambda x: x.get("updated_at", 0), reverse=True)

return results

except Exception as e:
self.logger.error(f"Failed to search: {e}")
return []
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

high

The search implementation for S3 is inefficient. It lists all objects under the prefix and then reads each object's full content to perform client-side filtering. For a large number of objects, this will be slow and can incur significant costs due to the high number of GET requests.

Consider using S3 Select for server-side filtering to improve performance. At a minimum, this performance limitation should be clearly documented in the docstring to warn users.

This same issue exists for GCSStorage and AzureStorage as well.

Comment on lines +347 to +373
async def clear(self) -> bool:
"""Clear all records from storage."""
await self._ensure_connection()

loop = asyncio.get_event_loop()

try:
# List all objects
response = await loop.run_in_executor(
None,
self.s3_client.list_objects_v2,
{"Bucket": self.bucket, "Prefix": self.prefix}
)

# Delete all objects
for obj in response.get("Contents", []):
await loop.run_in_executor(
None,
self.s3_client.delete_object,
{"Bucket": self.bucket, "Key": obj["Key"]}
)

return True

except Exception as e:
self.logger.error(f"Failed to clear storage: {e}")
return False
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

high

The clear method deletes objects one by one in a loop, which is inefficient for a large number of objects. Cloud storage providers offer batch deletion APIs that are faster and more cost-effective.

For S3, you should use s3_client.delete_objects(). This same issue applies to GCSStorage (use bucket.delete_blobs()) and AzureStorage (use container_client.delete_blobs()).

    async def clear(self) -> bool:
        """Clear all records from storage."""
        await self._ensure_connection()
        
        loop = asyncio.get_event_loop()
        
        try:
            # Use paginator to handle large number of objects
            paginator = self.s3_client.get_paginator('list_objects_v2')
            pages = paginator.paginate(Bucket=self.bucket, Prefix=self.prefix)

            for page in pages:
                if 'Contents' not in page:
                    continue
                
                objects_to_delete = [{'Key': obj['Key']} for obj in page['Contents']]
                if not objects_to_delete:
                    continue

                await loop.run_in_executor(
                    None,
                    self.s3_client.delete_objects,
                    {'Bucket': self.bucket, 'Delete': {'Objects': objects_to_delete}}
                )
            
            return True
            
        except Exception as e:
            self.logger.error(f"Failed to clear storage: {e}")
            return False

Comment on lines +217 to +264
async def search(self, query: Dict[str, Any]) -> List[Dict[str, Any]]:
"""
Search for records matching the query.

Note: Redis doesn't have native search capabilities like MongoDB/PostgreSQL.
This implementation scans all keys and filters client-side, which may be slow
for large datasets. Consider using RedisSearch module for production use.
"""
await self._ensure_connection()

try:
# Get all keys with our prefix
pattern = f"{self.key_prefix}*"
keys = await self.redis.keys(pattern)

if not keys:
return []

# Get all records
raw_data = await self.redis.mget(keys)
results = []

# Process and filter records
for i, data in enumerate(raw_data):
if data:
try:
json_str = self._decompress_data(data)
record = json.loads(json_str)
record["id"] = self._strip_prefix(keys[i].decode())

# Apply filters
if self._matches_query(record, query):
results.append(record)

except (json.JSONDecodeError, UnicodeDecodeError) as e:
self.logger.error(f"Failed to decode record: {e}")
continue

# Sort by updated_at descending
results.sort(key=lambda x: x.get("updated_at", 0), reverse=True)

# Apply limit
limit = query.get("limit", 100)
return results[:limit]

except RedisError as e:
self.logger.error(f"Failed to search: {e}")
return []
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

high

The search method uses the KEYS command, which is a blocking operation that can severely degrade Redis performance in production, especially with a large number of keys. It's strongly recommended to use SCAN instead, which iterates through the keyspace without blocking the server.

This same issue applies to the clear and count methods.

    async def search(self, query: Dict[str, Any]) -> List[Dict[str, Any]]:
        """
        Search for records matching the query.
        
        Note: This implementation scans keys and filters client-side, which may be slow
        for large datasets. Consider using RedisSearch module for production use.
        """
        await self._ensure_connection()
        
        try:
            results = []
            limit = query.get("limit", 100)
            
            # Use SCAN instead of KEYS to avoid blocking the server
            async for key_bytes in self.redis.scan_iter(match=f"{self.key_prefix}*"):
                data = await self.redis.get(key_bytes)
                if data:
                    try:
                        json_str = self._decompress_data(data)
                        record = json.loads(json_str)
                        record["id"] = self._strip_prefix(key_bytes.decode())
                        
                        if self._matches_query(record, query):
                            results.append(record)
                            
                    except (json.JSONDecodeError, UnicodeDecodeError) as e:
                        self.logger.error(f"Failed to decode record: {e}")
                        continue
            
            results.sort(key=lambda x: x.get("updated_at", 0), reverse=True)
            
            return results[:limit]
            
        except RedisError as e:
            self.logger.error(f"Failed to search: {e}")
            return []

Comment on lines +210 to +216
result = await conn.execute(
f"DELETE FROM {self.schema}.{self.table_name} WHERE id = $1",
key
)
# Extract affected rows from result string like "DELETE 1"
affected_rows = int(result.split()[-1]) if result.split() else 0
return affected_rows > 0
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

Parsing the status string returned by conn.execute (e.g., "DELETE 1") to determine the number of affected rows is brittle, as the format is not guaranteed to be stable across asyncpg versions.

A more robust approach is to use the RETURNING clause to confirm the deletion.

Suggested change
result = await conn.execute(
f"DELETE FROM {self.schema}.{self.table_name} WHERE id = $1",
key
)
# Extract affected rows from result string like "DELETE 1"
affected_rows = int(result.split()[-1]) if result.split() else 0
return affected_rows > 0
status = await conn.fetchval(
f"DELETE FROM {self.schema}.{self.table_name} WHERE id = $1 RETURNING id",
key
)
return status is not None

Copy link

@cursor cursor bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Bug: Async Initialization Flaws in SQLiteStorage

The SQLiteStorage constructor's asynchronous initialization is flawed. The check asyncio.get_event_loop().is_running() is problematic: asyncio.get_event_loop() is deprecated and can raise RuntimeError if no loop exists, while is_running() is not a valid method on loop objects, causing an AttributeError. Additionally, the asyncio.create_task() call is neither awaited nor stored, leading to a race condition where the constructor returns before database initialization completes. This can cause subsequent operations to fail or the task to be garbage collected. The initialization should use asyncio.get_running_loop() with proper error handling and ensure the task is managed.

src/praisonai-agents/praisonaiagents/storage/sqlite_storage.py#L44-L46

# Initialize database
asyncio.create_task(self._init_db()) if asyncio.get_event_loop().is_running() else self._init_db_sync()

Fix in CursorFix in Web


Bug: Async Calls in Sync Batch Writer

The DynamoDBStorage class incorrectly wraps batch.put_item and batch.delete_item calls with await loop.run_in_executor inside the synchronous boto3.table.batch_writer() context. This misuses the batch writer, which is designed for synchronous operations, leading to potential resource leaks, inconsistent batch behavior, and thread safety concerns.

src/praisonai-agents/praisonaiagents/storage/dynamodb_storage.py#L429-L437

# Delete all items
with self.table.batch_writer() as batch:
for item in response.get("Items", []):
await loop.run_in_executor(
None,
batch.delete_item,
{"Key": {"id": item["id"]}}
)

src/praisonai-agents/praisonaiagents/storage/dynamodb_storage.py#L469-L474

await loop.run_in_executor(
None,
batch.put_item,
{"Item": item}
)

Fix in CursorFix in Web


Bug: Async/Sync Bridging Causes Event Loop Conflicts

The EnhancedMemory class contains an incorrect async/sync bridging pattern in methods like store_short_term, search_short_term, reset_short_term, and reset_all. These methods create a new event loop with asyncio.new_event_loop() and set it as the current loop with asyncio.set_event_loop(loop). This can interfere with existing event loops, cause a RuntimeError if called within an already running async context, and does not restore the original event loop, potentially breaking other async code. The pattern should be replaced with asyncio.run() or proper handling of existing event loops.

src/praisonai-agents/praisonaiagents/memory/enhanced_memory.py#L510-L517

# Use async storage
loop = asyncio.new_event_loop()
asyncio.set_event_loop(loop)
try:
return loop.run_until_complete(self.store(key, data))
finally:
loop.close()
else:

Fix in CursorFix in Web


Bug: S3Storage Class Fails Boto3 API Calls

The S3Storage class incorrectly calls boto3 methods via asyncio.run_in_executor. It passes a dictionary (e.g., {'Bucket': self.bucket}) as a single positional argument, but boto3 methods expect keyword arguments (e.g., Bucket=self.bucket). This error affects all boto3 API calls within the class (e.g., head_bucket, get_object, put_object, list_objects_v2), causing runtime errors.

src/praisonai-agents/praisonaiagents/storage/cloud_storage.py#L144-L149

try:
await loop.run_in_executor(
None,
self.s3_client.head_bucket,
{"Bucket": self.bucket}
)

Fix in CursorFix in Web


Bug: SQL Injection via JSONB Key Interpolation

The PostgreSQLStorage.search method is vulnerable to SQL injection when searching by metadata. The PostgreSQL ->> operator requires the JSONB key to be a literal string, not a parameterized value. The current implementation attempts to parameterize the key (e.g., metadata ->> $1), which results in invalid SQL. To execute a valid query, the key must be directly interpolated into the SQL string. Since this key originates from user input (query["metadata"]), direct interpolation creates a SQL injection vulnerability.

src/praisonai-agents/praisonaiagents/storage/postgresql_storage.py#L246-L251

for key, value in query["metadata"].items():
param_count += 1
where_conditions.append(f"metadata ->> ${param_count} = ${param_count + 1}")
params.extend([key, str(value)])
param_count += 1

Fix in CursorFix in Web


Bug: Circular Import Issue in Memory Factory

A circular import bug exists in the Memory factory function within enhanced_memory.py. When STORAGE_AVAILABLE is False, the function attempts to import Memory from .memory (i.e., memory/__init__.py), which itself imports from enhanced_memory.py, leading to a circular dependency and an ImportError at runtime.

src/praisonai-agents/praisonaiagents/memory/enhanced_memory.py#L641-L647

"""
if STORAGE_AVAILABLE:
return EnhancedMemory(config, verbose)
else:
# Import and return original Memory class
from .memory import Memory as OriginalMemory
return OriginalMemory(config, verbose)

Fix in CursorFix in Web


BugBot free trial expires on July 22, 2025
Learn more in the Cursor dashboard.

Was this report helpful? Give feedback by reacting with 👍 or 👎

@MervinPraison
Copy link
Owner Author

@claude review this pull request and do a detailed analysis and fix this if the existing code doesn't have the solution implemented. Making sure it has backward compatibility, no existing features removed. After making those changes, again review the applied changes. Use @web to search if you dont know any information or to find the latest documentation or to find the latest version. Run the code if you think you need to run it to test it. Minimal code change to start with if required any changes.

@github-actions
Copy link
Contributor

github-actions bot commented Jul 17, 2025

Claude encountered an error —— View job


I'll analyze this and get back to you.

Copy link

@cursor cursor bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Cursor Bugbot has reviewed your changes and found 6 potential issues.

Bugbot Autofix is OFF. To automatically fix reported issues with Cloud Agents, enable Autofix in the Cursor dashboard.

This PR is being reviewed by Cursor Bugbot

Details

You are on the Bugbot Free tier. On this plan, Bugbot will review limited PRs each billing cycle.

To receive Bugbot reviews on all of your PRs, visit the Cursor dashboard to activate Pro and start your 14-day free trial.

None,
self.s3_client.get_object,
{"Bucket": self.bucket, "Key": s3_key}
)
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Incorrect run_in_executor usage with dictionary arguments

High Severity

The run_in_executor calls throughout S3Storage and DynamoDBStorage incorrectly pass a dictionary as a positional argument to boto3 methods. run_in_executor(executor, func, *args) passes positional args, but boto3 methods like get_object, put_object, head_bucket, etc. expect keyword arguments (e.g., Bucket=..., Key=...). Passing {"Bucket": self.bucket, "Key": s3_key} as a single positional arg causes the boto3 methods to fail with a TypeError at runtime. This affects all storage operations in both S3 and DynamoDB backends.

Additional Locations (2)

Fix in Cursor Fix in Web

# Create table
self.table = await loop.run_in_executor(
None, self.dynamodb.create_table, **table_kwargs
)
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Invalid kwargs unpacking in run_in_executor call

High Severity

The run_in_executor call uses **table_kwargs syntax which is invalid. run_in_executor only accepts positional arguments after the function, not keyword arguments. The **table_kwargs will be unpacked as arguments to run_in_executor itself (which doesn't accept these kwargs), rather than being passed to create_table. This will raise a TypeError when creating a new DynamoDB table.

Fix in Cursor Fix in Web

elif provider_lower == "gcs":
return GCSStorage(config)
elif provider_lower == "azure":
return AzureStorage(config)
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Storage class instantiation fails when dependency unavailable

High Severity

When a user configures a storage provider (e.g., "mongodb", "postgresql") but the required dependency isn't installed, the corresponding storage class variable will be None (from the storage module's fallback). Calling MongoDBStorage(config) when MongoDBStorage is None raises TypeError: 'NoneType' object is not callable, giving users no indication of the actual problem (missing dependency). The code lacks checks to verify the storage class is available before instantiation.

Fix in Cursor Fix in Web

if self.project:
client_kwargs["project"] = self.project
if self.credentials_path:
client_kwargs["credentials"] = self.credentials_path
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

GCS credentials parameter expects object not path

Medium Severity

The GCSStorage._ensure_connection method passes self.credentials_path (a string file path) directly to the gcs.Client constructor's credentials parameter. The GCS Client expects a google.auth.credentials.Credentials object, not a string path. Using a path string will cause the client to misinterpret the credentials and fail. The path needs to be loaded using service_account.Credentials.from_service_account_file() first.

Fix in Cursor Fix in Web

blob.upload_from_string,
content,
content_type="application/json"
)
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

GCS upload_from_string keyword arg fails in run_in_executor

High Severity

The run_in_executor call passes content_type="application/json" as a keyword argument, but run_in_executor only accepts positional arguments for the target function (after executor and func). The content_type keyword will be interpreted as an argument to run_in_executor itself, causing a TypeError. This affects all GCS write operations.

Fix in Cursor Fix in Web

os.makedirs(os.path.dirname(self.db_path) or ".", exist_ok=True)

# Initialize database
asyncio.create_task(self._init_db()) if asyncio.get_event_loop().is_running() else self._init_db_sync()
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

SQLiteStorage init crashes on Python 3.10+ without loop

High Severity

The SQLiteStorage.__init__ calls asyncio.get_event_loop().is_running() to decide between sync and async initialization. In Python 3.10+, calling get_event_loop() when no event loop exists emits a DeprecationWarning, and in Python 3.12+ this raises a RuntimeError. Since SQLiteStorage is typically instantiated synchronously from EnhancedMemory.__init__, this will crash on Python 3.12+ or cause warnings on earlier versions.

Fix in Cursor Fix in Web

Copy link
Contributor

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 14

🤖 Fix all issues with AI agents
In `@src/praisonai-agents/praisonaiagents/memory/enhanced_memory.py`:
- Around line 535-541: Update the docstrings for store_long_term and
search_long_term to explicitly state they are aliases that delegate to
store_short_term and search_short_term (i.e., long-term is not differentiated
from short-term in this implementation), and add a short note for users about
this behavior and potential future differentiation so callers aren’t surprised;
locate the methods named store_long_term and search_long_term and modify their
triple-quoted docstrings accordingly.
- Around line 157-178: The _create_storage_backend function currently
instantiates backend classes directly which can be None when optional backends
are missing; update each branch (e.g., SQLiteStorage, MongoDBStorage,
PostgreSQLStorage, RedisStorage, DynamoDBStorage, S3Storage, GCSStorage,
AzureStorage) to first check whether the class reference is not None and if it
is None raise a clear ImportError or ValueError (e.g., "Backend X is unavailable
— install optional dependency Y") otherwise instantiate and return the backend
with config; keep the existing provider_lower checks and the final
unsupported-provider ValueError.
- Around line 566-572: The search method currently accepts agent_id and run_id
but never uses them; either implement filtering or document they're
placeholders. To implement, propagate agent_id and run_id into the underlying
calls (pass them from search into search_user_memory and search_short_term) and
update those functions (search_user_memory, search_short_term) to accept and
apply agent_id/run_id filters in their query logic; ensure tests and any
DB/query builders respect these new filters. Alternatively, if not supporting
filtering yet, remove agent_id/run_id from search signature or explicitly state
in the search docstring that agent_id and run_id are not supported and
optionally emit a debug log/warning when they are provided.
- Around line 458-481: In _legacy_search: sanitize and escape SQL LIKE wildcards
in the incoming text_query from query before building the parameterized pattern,
replacing '%' and '_' (and optionally the escape character) with escaped
versions, then use a single parameterized pattern (e.g., the escaped text
wrapped with '%' on both sides) in the execute call (still using
query.get("limit") as the second parameter) and include an ESCAPE clause if your
SQL requires it; reference _legacy_search, short_db, text_query, and the execute
call to locate and update the code so the wildcard characters cannot be
injected.
- Around line 500-518: The current store_short_term implementation repeatedly
creates and closes event loops (asyncio.new_event_loop()), which can leak
resources and break in environments with an existing loop; replace that block
with a safe sync-to-async call: call the coroutine via
asyncio.run(self.store(key, data)) and catch RuntimeError for "event loop is
running"—in that case obtain the running loop with asyncio.get_running_loop()
and submit the coroutine via asyncio.run_coroutine_threadsafe(self.store(key,
data), loop) (or otherwise schedule it on the running loop) so you avoid
creating new loops; keep fallback to _legacy_store when primary_storage is False
and preserve the same key/data handling.

In `@src/praisonai-agents/praisonaiagents/storage/cloud_storage.py`:
- Around line 145-149: The run_in_executor calls are passing dicts as a single
positional arg to boto3 methods (e.g. self.s3_client.head_bucket) which expects
keyword args; replace those calls by binding keyword args using
functools.partial (or a small lambda) so the boto3 methods receive proper
kwargs. Update _verify_bucket (self.s3_client.head_bucket), read
(self.s3_client.get_object), write (self.s3_client.put_object / upload), delete
(self.s3_client.delete_object), search (list/scan calls), list_keys
(self.s3_client.list_objects_v2), clear (delete_objects / list + delete), exists
(head_object) and count to use functools.partial(self.s3_client.method,
Keyword=Value, ...) or an equivalent lambda for each run_in_executor invocation.
- Around line 479-482: The code passes self.credentials_path (a file path) into
gcs.Client via client_kwargs, but gcs.Client expects a credentials object;
change the logic around self.client creation so that when self.credentials_path
is set you call gcs.Client.from_service_account_json(self.credentials_path,
**client_kwargs) to load credentials from the JSON file, otherwise call
gcs.Client(**client_kwargs). Update the creation site referencing self.client,
client_kwargs, self.credentials_path, and gcs.Client to follow this conditional
flow.

In `@src/praisonai-agents/praisonaiagents/storage/dynamodb_storage.py`:
- Around line 324-328: The variable expression_values is initialized but never
used in the try block (alongside filter_expression), so remove the unused
expression_values initialization to clean up leftover code; if the intention was
to build an ExpressionAttributeValues mapping for DynamoDB, instead implement
and use it where you build the filter (e.g., populate and pass expression_values
into the query/scan call), otherwise delete the expression_values variable and
any related dead code in the try block to avoid unused-variable warnings.
- Around line 444-483: The batch_write method mixes the synchronous DynamoDB
context manager self.table.batch_writer() with asynchronous run_in_executor
calls, causing unsafe sync/async interplay; fix by collecting prepared items
inside batch_write (use self._prepare_item and set id/created_at/updated_at/ttl)
into a local list, then either (a) perform the entire batch_writer context and
call batch.put_item synchronously for each prepared item inside run_in_executor
(i.e., wrap a function that opens self.table.batch_writer() and loops
batch.put_item(items) and run that function in the executor), or (b) avoid
run_in_executor entirely and use synchronous code inside the async method by
executing the whole batching block in loop.run_in_executor; ensure you reference
batch_write, self.table.batch_writer, batch.put_item, self._prepare_item and
preserve results mapping per record and error handling.
- Around line 429-436: The code uses the synchronous context manager
self.table.batch_writer() while awaiting loop.run_in_executor for each
batch.delete_item, which can let the context manager exit before delete tasks
complete; fix by performing the whole batch_writer block synchronously inside
the executor (or perform deletes synchronously inside the context). Concretely,
move the batch_writer loop into a synchronous helper (e.g.,
_sync_delete_items(items)) that calls batch.delete_item(...) for each item using
the same self.table.batch_writer(), then call loop.run_in_executor(None,
self._sync_delete_items, response.get("Items", [])) or simply call
batch.delete_item(...) directly without await inside the existing context so the
__exit__ flushes after deletes finish; reference symbols:
self.table.batch_writer, batch.delete_item, loop.run_in_executor.

In `@src/praisonai-agents/praisonaiagents/storage/mongodb_storage.py`:
- Around line 66-92: The lazy init in _ensure_connection is racy: add an
asyncio.Lock (e.g., self._init_lock) created in the class __init__ to serialize
initialization, then wrap the init block in "async with self._init_lock:" and
re-check self._initialized inside the lock before creating AsyncIOMotorClient,
running await self.client.admin.command('ping'), setting
self.database/collection, calling await self._create_indexes(), and setting
self._initialized; keep existing exception handling and logger calls
(self.logger) intact so only one task performs initialization and others return
once _initialized is True.
- Around line 218-243: The list_keys method is vulnerable to regex injection
because the user-provided prefix is interpolated directly into a regex; update
the list_keys implementation (function name list_keys in mongodb_storage.py) to
escape regex metacharacters before building the query (e.g., use Python's
re.escape on the prefix) and then use the escaped value in the {"$regex":
f"^{escaped_prefix}"} filter; add the necessary import (re) if missing and keep
the rest of the query/limit/sorting logic unchanged.

In `@src/praisonai-agents/praisonaiagents/storage/redis_storage.py`:
- Around line 201-203: The except block currently catches a non-existent
json.JSONEncodeError; replace that with concrete serialization/runtime
exceptions thrown by json.dumps (e.g., TypeError and ValueError) so the handler
becomes except (RedisError, TypeError, ValueError) as e; make the same change
for the similar except block around lines 371-374 to ensure both write/serialize
failure paths log and return False correctly.

In `@src/praisonai-agents/praisonaiagents/storage/sqlite_storage.py`:
- Around line 44-45: The __init__ currently uses
asyncio.get_event_loop().is_running() and asyncio.create_task(self._init_db())
which can raise DeprecationWarning and can leave the task
unreferenced/garbage-collected; change this by removing the create_task call
from __init__ and provide an explicit async factory or initializer (e.g., an
async classmethod like async_create or an async_init method) that awaits
self._init_db(), keep the existing synchronous fallback _init_db_sync for sync
contexts, and update callers to use the async factory when they need async
initialization; reference the constructors and the methods _init_db and
_init_db_sync when making this change.
🧹 Nitpick comments (8)
src/praisonai-agents/praisonaiagents/storage/sqlite_storage.py (1)

194-198: Metadata search using LIKE pattern is fragile.

The JSON text search pattern '%"{key}": "{value}"%' assumes specific JSON formatting and won't match numeric values, booleans, or differently formatted JSON. Consider using SQLite's JSON functions if available (SQLite 3.38+).

📝 Note for consideration

For SQLite 3.38+, you could use:

# Using json_extract for more reliable matching
where_conditions.append("json_extract(metadata, ?) = ?")
params.append(f'$.{key}')
params.append(value)

This is optional since the current approach works for string values and maintains broader SQLite version compatibility.

src/praisonai-agents/praisonaiagents/storage/postgresql_storage.py (1)

94-113: Consider validating schema and table names.

While static analysis SQL injection warnings are false positives when config comes from trusted sources, if these values could ever come from user input, they should be validated against a whitelist pattern (e.g., alphanumeric with underscores only).

🛡️ Optional validation example
import re

def _validate_identifier(self, name: str, identifier_type: str) -> str:
    """Validate SQL identifier to prevent injection."""
    if not re.match(r'^[a-zA-Z_][a-zA-Z0-9_]*$', name):
        raise ValueError(f"Invalid {identifier_type}: {name}")
    return name

This is optional if config always comes from trusted sources like environment variables or hardcoded values.

src/praisonai-agents/praisonaiagents/storage/cloud_storage.py (1)

286-311: Duplicated _matches_query method across three classes.

The _matches_query method is identical in S3Storage, GCSStorage, and AzureStorage. Consider extracting this to the BaseStorage class to reduce duplication.

♻️ Suggested refactor

Add to BaseStorage in base.py:

def _matches_query(self, record: Dict[str, Any], query: Dict[str, Any]) -> bool:
    """Check if a record matches the search query."""
    # Text search in content
    if "text" in query:
        content = str(record.get("content", "")).lower()
        search_text = query["text"].lower()
        if search_text not in content:
            return False
    
    # Metadata search
    if "metadata" in query:
        record_metadata = record.get("metadata", {})
        for key, value in query["metadata"].items():
            if record_metadata.get(key) != value:
                return False
    
    # Time range filters
    if "created_after" in query:
        if record.get("created_at", 0) < query["created_after"]:
            return False
    
    if "created_before" in query:
        if record.get("created_at", float('inf')) > query["created_before"]:
            return False
    
    return True

Then remove the duplicated methods from S3Storage, GCSStorage, and AzureStorage.

Also applies to: 604-629, 898-923

src/praisonai-agents/praisonaiagents/memory/__init__.py (1)

23-23: Consider exporting ENHANCED_AVAILABLE for feature detection.

The ENHANCED_AVAILABLE flag could be useful for consumers to detect which implementation is active. Consider adding it to __all__.

📝 Optional enhancement
-__all__ = ["Memory"] 
+__all__ = ["Memory", "ENHANCED_AVAILABLE"]

This allows consumers to check availability with:

from praisonaiagents.memory import Memory, ENHANCED_AVAILABLE
if ENHANCED_AVAILABLE:
    # Use enhanced features
src/praisonai-agents/praisonaiagents/storage/mongodb_storage.py (2)

286-298: Unused variable result from bulk_write.

The result from bulk_write is assigned but never used. While this works, consider either using it to provide more accurate per-key success tracking or removing the assignment.

♻️ Proposed fix
             # Execute bulk operation
             if operations:
-                result = await self.collection.bulk_write(operations)
-
-                # Mark all as successful if bulk operation succeeded
-                for key in records.keys():
-                    results[key] = True
+                await self.collection.bulk_write(operations)
+                # Mark all as successful if bulk operation completed without exception
+                results = {key: True for key in records.keys()}

341-345: Missing async keyword on close method.

The close method is synchronous, but other storage backends may expect it to be async for consistency. The motor client's close() is synchronous, so this works, but consider adding async for interface consistency.

src/praisonai-agents/praisonaiagents/__init__.py (1)

134-143: Consider exposing availability flags in __all__.

The storage classes in __all__ may be None if dependencies aren't installed. Consider also exporting availability flags (e.g., _storage_available) or documenting this behavior so users can check before using these classes.

src/praisonai-agents/praisonaiagents/storage/__init__.py (1)

54-64: Consider exporting availability flags.

The __all__ exports all storage classes but not the availability flags (MONGODB_AVAILABLE, etc.). Exporting these would help consumers check backend availability without attempting imports.

♻️ Proposed enhancement
 __all__ = [
+    # Availability flags
+    "MONGODB_AVAILABLE",
+    "POSTGRESQL_AVAILABLE",
+    "REDIS_AVAILABLE",
+    "DYNAMODB_AVAILABLE",
+    "CLOUD_AVAILABLE",
+    # Storage classes
     "BaseStorage",
     "SQLiteStorage",

Comment on lines +157 to +178
def _create_storage_backend(self, provider: str, config: Dict[str, Any]) -> BaseStorage:
"""Create a storage backend instance."""
provider_lower = provider.lower()

if provider_lower == "sqlite":
return SQLiteStorage(config)
elif provider_lower == "mongodb":
return MongoDBStorage(config)
elif provider_lower == "postgresql":
return PostgreSQLStorage(config)
elif provider_lower == "redis":
return RedisStorage(config)
elif provider_lower == "dynamodb":
return DynamoDBStorage(config)
elif provider_lower == "s3":
return S3Storage(config)
elif provider_lower == "gcs":
return GCSStorage(config)
elif provider_lower == "azure":
return AzureStorage(config)
else:
raise ValueError(f"Unsupported storage provider: {provider}")
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟠 Major

Missing None checks before instantiating optional backends.

When storage backends aren't available, the classes are None. This code will raise TypeError instead of a helpful error when trying to instantiate an unavailable backend.

🐛 Proposed fix to add availability checks
     def _create_storage_backend(self, provider: str, config: Dict[str, Any]) -> BaseStorage:
         """Create a storage backend instance."""
         provider_lower = provider.lower()
 
         if provider_lower == "sqlite":
             return SQLiteStorage(config)
         elif provider_lower == "mongodb":
+            if MongoDBStorage is None:
+                raise ImportError("MongoDB storage requires motor and pymongo. Install with: pip install motor pymongo")
             return MongoDBStorage(config)
         elif provider_lower == "postgresql":
+            if PostgreSQLStorage is None:
+                raise ImportError("PostgreSQL storage requires asyncpg. Install with: pip install asyncpg")
             return PostgreSQLStorage(config)
         elif provider_lower == "redis":
+            if RedisStorage is None:
+                raise ImportError("Redis storage requires redis. Install with: pip install redis[aio]")
             return RedisStorage(config)
         elif provider_lower == "dynamodb":
+            if DynamoDBStorage is None:
+                raise ImportError("DynamoDB storage requires boto3. Install with: pip install boto3")
             return DynamoDBStorage(config)
         elif provider_lower == "s3":
+            if S3Storage is None:
+                raise ImportError("S3 storage requires boto3. Install with: pip install boto3")
             return S3Storage(config)
         elif provider_lower == "gcs":
+            if GCSStorage is None:
+                raise ImportError("GCS storage requires google-cloud-storage. Install with: pip install google-cloud-storage")
             return GCSStorage(config)
         elif provider_lower == "azure":
+            if AzureStorage is None:
+                raise ImportError("Azure storage requires azure-storage-blob. Install with: pip install azure-storage-blob")
             return AzureStorage(config)
         else:
             raise ValueError(f"Unsupported storage provider: {provider}")
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
def _create_storage_backend(self, provider: str, config: Dict[str, Any]) -> BaseStorage:
"""Create a storage backend instance."""
provider_lower = provider.lower()
if provider_lower == "sqlite":
return SQLiteStorage(config)
elif provider_lower == "mongodb":
return MongoDBStorage(config)
elif provider_lower == "postgresql":
return PostgreSQLStorage(config)
elif provider_lower == "redis":
return RedisStorage(config)
elif provider_lower == "dynamodb":
return DynamoDBStorage(config)
elif provider_lower == "s3":
return S3Storage(config)
elif provider_lower == "gcs":
return GCSStorage(config)
elif provider_lower == "azure":
return AzureStorage(config)
else:
raise ValueError(f"Unsupported storage provider: {provider}")
def _create_storage_backend(self, provider: str, config: Dict[str, Any]) -> BaseStorage:
"""Create a storage backend instance."""
provider_lower = provider.lower()
if provider_lower == "sqlite":
return SQLiteStorage(config)
elif provider_lower == "mongodb":
if MongoDBStorage is None:
raise ImportError("MongoDB storage requires motor and pymongo. Install with: pip install motor pymongo")
return MongoDBStorage(config)
elif provider_lower == "postgresql":
if PostgreSQLStorage is None:
raise ImportError("PostgreSQL storage requires asyncpg. Install with: pip install asyncpg")
return PostgreSQLStorage(config)
elif provider_lower == "redis":
if RedisStorage is None:
raise ImportError("Redis storage requires redis. Install with: pip install redis[aio]")
return RedisStorage(config)
elif provider_lower == "dynamodb":
if DynamoDBStorage is None:
raise ImportError("DynamoDB storage requires boto3. Install with: pip install boto3")
return DynamoDBStorage(config)
elif provider_lower == "s3":
if S3Storage is None:
raise ImportError("S3 storage requires boto3. Install with: pip install boto3")
return S3Storage(config)
elif provider_lower == "gcs":
if GCSStorage is None:
raise ImportError("GCS storage requires google-cloud-storage. Install with: pip install google-cloud-storage")
return GCSStorage(config)
elif provider_lower == "azure":
if AzureStorage is None:
raise ImportError("Azure storage requires azure-storage-blob. Install with: pip install azure-storage-blob")
return AzureStorage(config)
else:
raise ValueError(f"Unsupported storage provider: {provider}")
🧰 Tools
🪛 Ruff (0.14.14)

[warning] 178-178: Avoid specifying long messages outside the exception class

(TRY003)

🤖 Prompt for AI Agents
In `@src/praisonai-agents/praisonaiagents/memory/enhanced_memory.py` around lines
157 - 178, The _create_storage_backend function currently instantiates backend
classes directly which can be None when optional backends are missing; update
each branch (e.g., SQLiteStorage, MongoDBStorage, PostgreSQLStorage,
RedisStorage, DynamoDBStorage, S3Storage, GCSStorage, AzureStorage) to first
check whether the class reference is not None and if it is None raise a clear
ImportError or ValueError (e.g., "Backend X is unavailable — install optional
dependency Y") otherwise instantiate and return the backend with config; keep
the existing provider_lower checks and the final unsupported-provider
ValueError.

Comment on lines +458 to +481
def _legacy_search(self, query: Dict[str, Any]) -> List[Dict[str, Any]]:
"""Legacy search implementation for backward compatibility."""
try:
if hasattr(self, 'short_db'):
conn = sqlite3.connect(self.short_db)
text_query = query.get("text", "")
rows = conn.execute(
"SELECT id, content, meta, created_at FROM short_mem WHERE content LIKE ? LIMIT ?",
(f"%{text_query}%", query.get("limit", 100))
).fetchall()
conn.close()

results = []
for row in rows:
results.append({
"id": row[0],
"content": row[1],
"metadata": json.loads(row[2] or "{}"),
"created_at": row[3]
})
return results
except Exception as e:
logger.error(f"Legacy search failed: {e}")
return []
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟡 Minor

SQL injection vulnerability in legacy search.

The text_query is directly interpolated into the SQL LIKE clause without proper parameterization of the wildcard pattern itself. While SQLite parameterization is used, the % wildcards concatenated with user input could enable injection of additional wildcards.

🛡️ Proposed fix - escape SQL LIKE wildcards
     def _legacy_search(self, query: Dict[str, Any]) -> List[Dict[str, Any]]:
         """Legacy search implementation for backward compatibility."""
         try:
             if hasattr(self, 'short_db'):
                 conn = sqlite3.connect(self.short_db)
                 text_query = query.get("text", "")
+                # Escape SQL LIKE wildcards in user input
+                text_query = text_query.replace("%", "\\%").replace("_", "\\_")
                 rows = conn.execute(
-                    "SELECT id, content, meta, created_at FROM short_mem WHERE content LIKE ? LIMIT ?",
+                    "SELECT id, content, meta, created_at FROM short_mem WHERE content LIKE ? ESCAPE '\\' LIMIT ?",
                     (f"%{text_query}%", query.get("limit", 100))
                 ).fetchall()
🧰 Tools
🪛 Ruff (0.14.14)

[warning] 479-479: Do not catch blind exception: Exception

(BLE001)


[warning] 480-480: Use logging.exception instead of logging.error

Replace with exception

(TRY400)

🤖 Prompt for AI Agents
In `@src/praisonai-agents/praisonaiagents/memory/enhanced_memory.py` around lines
458 - 481, In _legacy_search: sanitize and escape SQL LIKE wildcards in the
incoming text_query from query before building the parameterized pattern,
replacing '%' and '_' (and optionally the escape character) with escaped
versions, then use a single parameterized pattern (e.g., the escaped text
wrapped with '%' on both sides) in the execute call (still using
query.get("limit") as the second parameter) and include an ESCAPE clause if your
SQL requires it; reference _legacy_search, short_db, text_query, and the execute
call to locate and update the code so the wildcard characters cannot be
injected.

Comment on lines +500 to +518
def store_short_term(self, text: str, metadata: Dict[str, Any] = None, **kwargs):
"""Store in short-term memory (legacy compatibility)."""
key = str(time.time_ns())
data = {
"content": text,
"metadata": metadata or {},
"created_at": time.time()
}

if self.primary_storage:
# Use async storage
loop = asyncio.new_event_loop()
asyncio.set_event_loop(loop)
try:
return loop.run_until_complete(self.store(key, data))
finally:
loop.close()
else:
return self._legacy_store(key, data)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟠 Major

Creating new event loops repeatedly can cause resource leaks and issues.

The pattern asyncio.new_event_loop() / loop.close() in every sync wrapper call is inefficient and can cause issues in environments that already have an event loop (e.g., Jupyter notebooks, web frameworks). Consider using asyncio.get_event_loop() with fallback or caching the loop.

🔧 Proposed fix using asyncio.run() or loop caching
+    def _run_async(self, coro):
+        """Run an async coroutine from sync context."""
+        try:
+            loop = asyncio.get_running_loop()
+        except RuntimeError:
+            # No running loop - safe to use asyncio.run()
+            return asyncio.run(coro)
+        else:
+            # Already in async context - use nest_asyncio or create task
+            import nest_asyncio
+            nest_asyncio.apply()
+            return loop.run_until_complete(coro)
+
     def store_short_term(self, text: str, metadata: Dict[str, Any] = None, **kwargs):
         """Store in short-term memory (legacy compatibility)."""
         key = str(time.time_ns())
         data = {
             "content": text,
             "metadata": metadata or {},
             "created_at": time.time()
         }
 
         if self.primary_storage:
-            # Use async storage
-            loop = asyncio.new_event_loop()
-            asyncio.set_event_loop(loop)
-            try:
-                return loop.run_until_complete(self.store(key, data))
-            finally:
-                loop.close()
+            return self._run_async(self.store(key, data))
         else:
             return self._legacy_store(key, data)

Alternatively, simpler fix using asyncio.run() (Python 3.7+):

         if self.primary_storage:
-            loop = asyncio.new_event_loop()
-            asyncio.set_event_loop(loop)
-            try:
-                return loop.run_until_complete(self.store(key, data))
-            finally:
-                loop.close()
+            return asyncio.run(self.store(key, data))
🧰 Tools
🪛 Ruff (0.14.14)

[warning] 500-500: PEP 484 prohibits implicit Optional

Convert to T | None

(RUF013)


[warning] 500-500: Unused method argument: kwargs

(ARG002)

🤖 Prompt for AI Agents
In `@src/praisonai-agents/praisonaiagents/memory/enhanced_memory.py` around lines
500 - 518, The current store_short_term implementation repeatedly creates and
closes event loops (asyncio.new_event_loop()), which can leak resources and
break in environments with an existing loop; replace that block with a safe
sync-to-async call: call the coroutine via asyncio.run(self.store(key, data))
and catch RuntimeError for "event loop is running"—in that case obtain the
running loop with asyncio.get_running_loop() and submit the coroutine via
asyncio.run_coroutine_threadsafe(self.store(key, data), loop) (or otherwise
schedule it on the running loop) so you avoid creating new loops; keep fallback
to _legacy_store when primary_storage is False and preserve the same key/data
handling.

Comment on lines +535 to +541
def store_long_term(self, text: str, metadata: Dict[str, Any] = None, **kwargs):
"""Store in long-term memory (legacy compatibility)."""
return self.store_short_term(text, metadata, **kwargs)

def search_long_term(self, query: str, limit: int = 5, **kwargs) -> List[Dict[str, Any]]:
"""Search long-term memory (legacy compatibility)."""
return self.search_short_term(query, limit, **kwargs)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟡 Minor

store_long_term and search_long_term are aliases - document this behavior.

These methods simply delegate to short-term equivalents, meaning long-term and short-term memory are not differentiated. This may surprise users expecting separate storage. Consider adding a docstring note or actually differentiating them.

🧰 Tools
🪛 Ruff (0.14.14)

[warning] 535-535: PEP 484 prohibits implicit Optional

Convert to T | None

(RUF013)

🤖 Prompt for AI Agents
In `@src/praisonai-agents/praisonaiagents/memory/enhanced_memory.py` around lines
535 - 541, Update the docstrings for store_long_term and search_long_term to
explicitly state they are aliases that delegate to store_short_term and
search_short_term (i.e., long-term is not differentiated from short-term in this
implementation), and add a short note for users about this behavior and
potential future differentiation so callers aren’t surprised; locate the methods
named store_long_term and search_long_term and modify their triple-quoted
docstrings accordingly.

Comment on lines +566 to +572
def search(self, query: str, user_id: Optional[str] = None, agent_id: Optional[str] = None,
run_id: Optional[str] = None, limit: int = 5, **kwargs) -> List[Dict[str, Any]]:
"""Generic search method (legacy compatibility)."""
if user_id:
return self.search_user_memory(user_id, query, limit=limit, **kwargs)
else:
return self.search_short_term(query, limit=limit, **kwargs)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟡 Minor

Unused parameters agent_id and run_id in search method.

These parameters are accepted but never used, which could mislead users into thinking filtering by agent/run is supported.

🛡️ Options: implement or document as not-yet-supported

Option 1 - Document that these are placeholders:

     def search(self, query: str, user_id: Optional[str] = None, agent_id: Optional[str] = None, 
                run_id: Optional[str] = None, limit: int = 5, **kwargs) -> List[Dict[str, Any]]:
-        """Generic search method (legacy compatibility)."""
+        """Generic search method (legacy compatibility).
+        
+        Note: agent_id and run_id parameters are accepted for API compatibility
+        but are not currently used for filtering.
+        """

Option 2 - Implement filtering:

         if user_id:
             return self.search_user_memory(user_id, query, limit=limit, **kwargs)
+        elif agent_id:
+            results = self.search_short_term(query, limit=20)
+            return [r for r in results if r.get("metadata", {}).get("agent_id") == agent_id][:limit]
         else:
             return self.search_short_term(query, limit=limit, **kwargs)
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
def search(self, query: str, user_id: Optional[str] = None, agent_id: Optional[str] = None,
run_id: Optional[str] = None, limit: int = 5, **kwargs) -> List[Dict[str, Any]]:
"""Generic search method (legacy compatibility)."""
if user_id:
return self.search_user_memory(user_id, query, limit=limit, **kwargs)
else:
return self.search_short_term(query, limit=limit, **kwargs)
def search(self, query: str, user_id: Optional[str] = None, agent_id: Optional[str] = None,
run_id: Optional[str] = None, limit: int = 5, **kwargs) -> List[Dict[str, Any]]:
"""Generic search method (legacy compatibility).
Note: agent_id and run_id parameters are accepted for API compatibility
but are not currently used for filtering.
"""
if user_id:
return self.search_user_memory(user_id, query, limit=limit, **kwargs)
else:
return self.search_short_term(query, limit=limit, **kwargs)
Suggested change
def search(self, query: str, user_id: Optional[str] = None, agent_id: Optional[str] = None,
run_id: Optional[str] = None, limit: int = 5, **kwargs) -> List[Dict[str, Any]]:
"""Generic search method (legacy compatibility)."""
if user_id:
return self.search_user_memory(user_id, query, limit=limit, **kwargs)
else:
return self.search_short_term(query, limit=limit, **kwargs)
def search(self, query: str, user_id: Optional[str] = None, agent_id: Optional[str] = None,
run_id: Optional[str] = None, limit: int = 5, **kwargs) -> List[Dict[str, Any]]:
"""Generic search method (legacy compatibility)."""
if user_id:
return self.search_user_memory(user_id, query, limit=limit, **kwargs)
elif agent_id:
results = self.search_short_term(query, limit=20)
return [r for r in results if r.get("metadata", {}).get("agent_id") == agent_id][:limit]
else:
return self.search_short_term(query, limit=limit, **kwargs)
🧰 Tools
🪛 Ruff (0.14.14)

[warning] 566-566: Unused method argument: agent_id

(ARG002)


[warning] 567-567: Unused method argument: run_id

(ARG002)

🤖 Prompt for AI Agents
In `@src/praisonai-agents/praisonaiagents/memory/enhanced_memory.py` around lines
566 - 572, The search method currently accepts agent_id and run_id but never
uses them; either implement filtering or document they're placeholders. To
implement, propagate agent_id and run_id into the underlying calls (pass them
from search into search_user_memory and search_short_term) and update those
functions (search_user_memory, search_short_term) to accept and apply
agent_id/run_id filters in their query logic; ensure tests and any DB/query
builders respect these new filters. Alternatively, if not supporting filtering
yet, remove agent_id/run_id from search signature or explicitly state in the
search docstring that agent_id and run_id are not supported and optionally emit
a debug log/warning when they are provided.

Comment on lines +444 to +483
async def batch_write(self, records: Dict[str, Dict[str, Any]]) -> Dict[str, bool]:
"""Optimized batch write for DynamoDB."""
await self._ensure_connection()

loop = asyncio.get_event_loop()
results = {}

try:
current_time = time.time()

# DynamoDB batch_writer handles batching automatically
with self.table.batch_writer() as batch:
for key, data in records.items():
item = data.copy()
item["id"] = key
item["updated_at"] = current_time

if "created_at" not in item:
item["created_at"] = current_time

if self.ttl_attribute and self.ttl_attribute not in item:
ttl_seconds = 30 * 24 * 60 * 60
item[self.ttl_attribute] = int(time.time() + ttl_seconds)

item = self._prepare_item(item)

await loop.run_in_executor(
None,
batch.put_item,
{"Item": item}
)

results[key] = True

except Exception as e:
self.logger.error(f"Failed batch write: {e}")
for key in records.keys():
results[key] = False

return results
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟠 Major

Same issue in batch_write with sync context manager.

The batch_write method has the same issue as clear() - mixing synchronous batch_writer() context manager with async run_in_executor calls inside it.

🐛 Suggested fix
     async def batch_write(self, records: Dict[str, Dict[str, Any]]) -> Dict[str, bool]:
         """Optimized batch write for DynamoDB."""
         await self._ensure_connection()
         
         loop = asyncio.get_event_loop()
         results = {}
         
         try:
             current_time = time.time()
             
-            # DynamoDB batch_writer handles batching automatically
-            with self.table.batch_writer() as batch:
-                for key, data in records.items():
-                    item = data.copy()
-                    item["id"] = key
-                    item["updated_at"] = current_time
-                    
-                    if "created_at" not in item:
-                        item["created_at"] = current_time
-                    
-                    if self.ttl_attribute and self.ttl_attribute not in item:
-                        ttl_seconds = 30 * 24 * 60 * 60
-                        item[self.ttl_attribute] = int(time.time() + ttl_seconds)
-                    
-                    item = self._prepare_item(item)
-                    
-                    await loop.run_in_executor(
-                        None,
-                        batch.put_item,
-                        {"Item": item}
-                    )
-                    
-                    results[key] = True
+            # Prepare items
+            items_to_write = []
+            for key, data in records.items():
+                item = data.copy()
+                item["id"] = key
+                item["updated_at"] = current_time
+                if "created_at" not in item:
+                    item["created_at"] = current_time
+                if self.ttl_attribute and self.ttl_attribute not in item:
+                    ttl_seconds = 30 * 24 * 60 * 60
+                    item[self.ttl_attribute] = int(time.time() + ttl_seconds)
+                items_to_write.append((key, self._prepare_item(item)))
+            
+            # Write all items synchronously within executor
+            def write_all():
+                with self.table.batch_writer() as batch:
+                    for key, item in items_to_write:
+                        batch.put_item(Item=item)
+            
+            await loop.run_in_executor(None, write_all)
+            
+            for key in records.keys():
+                results[key] = True
🧰 Tools
🪛 Ruff (0.14.14)

[warning] 478-478: Do not catch blind exception: Exception

(BLE001)


[warning] 479-479: Use logging.exception instead of logging.error

Replace with exception

(TRY400)

🤖 Prompt for AI Agents
In `@src/praisonai-agents/praisonaiagents/storage/dynamodb_storage.py` around
lines 444 - 483, The batch_write method mixes the synchronous DynamoDB context
manager self.table.batch_writer() with asynchronous run_in_executor calls,
causing unsafe sync/async interplay; fix by collecting prepared items inside
batch_write (use self._prepare_item and set id/created_at/updated_at/ttl) into a
local list, then either (a) perform the entire batch_writer context and call
batch.put_item synchronously for each prepared item inside run_in_executor
(i.e., wrap a function that opens self.table.batch_writer() and loops
batch.put_item(items) and run that function in the executor), or (b) avoid
run_in_executor entirely and use synchronous code inside the async method by
executing the whole batching block in loop.run_in_executor; ensure you reference
batch_write, self.table.batch_writer, batch.put_item, self._prepare_item and
preserve results mapping per record and error handling.

Comment on lines +66 to +92
async def _ensure_connection(self):
"""Ensure MongoDB connection is established."""
if not self._initialized:
try:
self.client = AsyncIOMotorClient(
self.url,
serverSelectionTimeoutMS=self.timeout
)

# Test connection
await self.client.admin.command('ping')

self.database = self.client[self.database_name]
self.collection = self.database[self.collection_name]

# Create indexes
await self._create_indexes()

self._initialized = True
self.logger.info(f"Connected to MongoDB: {self.database_name}.{self.collection_name}")

except ServerSelectionTimeoutError as e:
self.logger.error(f"Failed to connect to MongoDB: {e}")
raise
except Exception as e:
self.logger.error(f"Unexpected error connecting to MongoDB: {e}")
raise
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟡 Minor

Consider thread-safety for lazy initialization.

The _ensure_connection method has a potential race condition if called concurrently from multiple async tasks - two tasks could both see _initialized = False and attempt to initialize simultaneously.

🔒 Proposed fix using asyncio.Lock
+        self._init_lock = asyncio.Lock()
         self._initialized = False
 
     async def _ensure_connection(self):
         """Ensure MongoDB connection is established."""
-        if not self._initialized:
+        if self._initialized:
+            return
+        async with self._init_lock:
+            if self._initialized:  # Double-check after acquiring lock
+                return
             try:
                 self.client = AsyncIOMotorClient(
🧰 Tools
🪛 Ruff (0.14.14)

[warning] 88-88: Use logging.exception instead of logging.error

Replace with exception

(TRY400)


[warning] 91-91: Use logging.exception instead of logging.error

Replace with exception

(TRY400)

🤖 Prompt for AI Agents
In `@src/praisonai-agents/praisonaiagents/storage/mongodb_storage.py` around lines
66 - 92, The lazy init in _ensure_connection is racy: add an asyncio.Lock (e.g.,
self._init_lock) created in the class __init__ to serialize initialization, then
wrap the init block in "async with self._init_lock:" and re-check
self._initialized inside the lock before creating AsyncIOMotorClient, running
await self.client.admin.command('ping'), setting self.database/collection,
calling await self._create_indexes(), and setting self._initialized; keep
existing exception handling and logger calls (self.logger) intact so only one
task performs initialization and others return once _initialized is True.

Comment on lines +218 to +243
async def list_keys(self, prefix: Optional[str] = None, limit: Optional[int] = None) -> List[str]:
"""List keys in storage."""
await self._ensure_connection()

try:
# Build query for prefix filtering
query = {}
if prefix:
query["_id"] = {"$regex": f"^{prefix}"}

# Get only the _id field
cursor = self.collection.find(query, {"_id": 1})

if limit:
cursor = cursor.limit(limit)

cursor = cursor.sort("created_at", DESCENDING)

keys = []
async for doc in cursor:
keys.append(doc["_id"])

return keys
except Exception as e:
self.logger.error(f"Failed to list keys: {e}")
return []
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟡 Minor

Potential regex injection in list_keys prefix filter.

User-provided prefix is directly interpolated into a regex pattern without escaping, which could cause unexpected behavior or errors if the prefix contains regex metacharacters.

🛡️ Proposed fix to escape regex metacharacters
+import re
+
     async def list_keys(self, prefix: Optional[str] = None, limit: Optional[int] = None) -> List[str]:
         """List keys in storage."""
         await self._ensure_connection()
 
         try:
             # Build query for prefix filtering
             query = {}
             if prefix:
-                query["_id"] = {"$regex": f"^{prefix}"}
+                query["_id"] = {"$regex": f"^{re.escape(prefix)}"}
🧰 Tools
🪛 Ruff (0.14.14)

[warning] 240-240: Consider moving this statement to an else block

(TRY300)


[warning] 241-241: Do not catch blind exception: Exception

(BLE001)


[warning] 242-242: Use logging.exception instead of logging.error

Replace with exception

(TRY400)

🤖 Prompt for AI Agents
In `@src/praisonai-agents/praisonaiagents/storage/mongodb_storage.py` around lines
218 - 243, The list_keys method is vulnerable to regex injection because the
user-provided prefix is interpolated directly into a regex; update the list_keys
implementation (function name list_keys in mongodb_storage.py) to escape regex
metacharacters before building the query (e.g., use Python's re.escape on the
prefix) and then use the escaped value in the {"$regex": f"^{escaped_prefix}"}
filter; add the necessary import (re) if missing and keep the rest of the
query/limit/sorting logic unchanged.

Comment on lines +201 to +203
except (RedisError, json.JSONEncodeError) as e:
self.logger.error(f"Failed to write key {key}: {e}")
return False
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟡 Minor

json.JSONEncodeError does not exist in the standard library.

The exception type json.JSONEncodeError is not a valid Python exception. The json module only defines JSONDecodeError. Serialization errors from json.dumps() would raise TypeError or ValueError.

🐛 Proposed fix
-        except (RedisError, json.JSONEncodeError) as e:
+        except (RedisError, TypeError, ValueError) as e:

Also applies to: 371-374

🧰 Tools
🪛 Ruff (0.14.14)

[warning] 202-202: Use logging.exception instead of logging.error

Replace with exception

(TRY400)

🤖 Prompt for AI Agents
In `@src/praisonai-agents/praisonaiagents/storage/redis_storage.py` around lines
201 - 203, The except block currently catches a non-existent
json.JSONEncodeError; replace that with concrete serialization/runtime
exceptions thrown by json.dumps (e.g., TypeError and ValueError) so the handler
becomes except (RedisError, TypeError, ValueError) as e; make the same change
for the similar except block around lines 371-374 to ensure both write/serialize
failure paths log and return False correctly.

Comment on lines +44 to +45
# Initialize database
asyncio.create_task(self._init_db()) if asyncio.get_event_loop().is_running() else self._init_db_sync()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟡 Minor

Potential issue with asyncio.create_task in __init__.

The condition asyncio.get_event_loop().is_running() can raise DeprecationWarning in Python 3.10+ and may not work as expected. Using asyncio.create_task in __init__ can also cause the task to be garbage collected before completion if not awaited.

🛠️ Suggested safer approach
-        # Initialize database
-        asyncio.create_task(self._init_db()) if asyncio.get_event_loop().is_running() else self._init_db_sync()
+        # Initialize database synchronously - async init can be called separately if needed
+        self._init_db_sync()

Alternatively, consider providing an explicit async factory method for async initialization contexts.

🤖 Prompt for AI Agents
In `@src/praisonai-agents/praisonaiagents/storage/sqlite_storage.py` around lines
44 - 45, The __init__ currently uses asyncio.get_event_loop().is_running() and
asyncio.create_task(self._init_db()) which can raise DeprecationWarning and can
leave the task unreferenced/garbage-collected; change this by removing the
create_task call from __init__ and provide an explicit async factory or
initializer (e.g., an async classmethod like async_create or an async_init
method) that awaits self._init_db(), keep the existing synchronous fallback
_init_db_sync for sync contexts, and update callers to use the async factory
when they need async initialization; reference the constructors and the methods
_init_db and _init_db_sync when making this change.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

1 participant