This document is a detailed guide to using the SqlStorage class for database operations, including support for MySQL, PostgreSQL, SQLite, and other databases.
SqlStorage is an async/sync database storage implementation based on SQLAlchemy, providing a unified interface for handling various SQL database operations.
The primary storage class that provides database connection and operation interfaces.
SqlKey: Specifies keys for database lookups and queries (for example, primary keys forget)SqlCondition: Used to define query conditionsStorageData: Base class for data models
# Core dependencies
pip install sqlalchemy
# MySQL support
pip install aiomysql PyMySQL
# PostgreSQL support
pip install asyncpg psycopg2
# SQLite support (built into Python)
# No additional installation required-- Create database
CREATE DATABASE test_db CHARACTER SET utf8mb4 COLLATE utf8mb4_unicode_ci;
-- Create user (optional)
CREATE USER 'test_user'@'localhost' IDENTIFIED BY 'test_password';
GRANT ALL PRIVILEGES ON test_db.* TO 'test_user'@'localhost';
FLUSH PRIVILEGES;-- Create database
CREATE DATABASE test_db;
-- Create user (optional)
CREATE USER test_user WITH PASSWORD 'test_password';
GRANT ALL PRIVILEGES ON DATABASE test_db TO test_user;from trpc_agent_sdk.storage import SqlStorage
# Async mode (recommended)
storage = SqlStorage(
is_async=True,
db_url="mysql+aiomysql://root:password@localhost/test_db",
echo=True, # Enable SQL logging
pool_size=10,
max_overflow=20
)
# Sync mode
storage = SqlStorage(
is_async=False,
db_url="mysql+pymysql://root:password@localhost/test_db",
echo=True
)from dataclasses import dataclass
from datetime import datetime
from sqlalchemy import Column, Integer, String, DateTime, Text
from trpc_agent_sdk.storage import StorageData
@dataclass
class UserData(StorageData):
"""User data model"""
__tablename__ = 'users'
id: int = Column(Integer, primary_key=True, autoincrement=True)
username: str = Column(String(50), unique=True, nullable=False)
email: str = Column(String(100), nullable=False)
full_name: str = Column(String(100), nullable=True)
bio: str = Column(Text, nullable=True)
created_at: datetime = Column(DateTime, default=datetime.utcnow)
updated_at: datetime = Column(DateTime, default=datetime.utcnow, onupdate=datetime.utcnow)import asyncio
from trpc_agent_sdk.storage import SqlStorage, SqlKey, SqlCondition
async def basic_example():
# Initialize storage
storage = SqlStorage(
is_async=True,
db_url="mysql+aiomysql://root:password@localhost/test_db"
)
try:
# Create database engine and tables
await storage.create_sql_engine()
# Use database session
async with storage.create_db_session() as session:
# Create new user
user = UserData(
username="john_doe",
email="john@example.com",
full_name="John Doe",
bio="Software engineer"
)
# Add user
await storage.add(session, user)
await storage.commit(session)
await storage.refresh(session, user)
print(f"Created user with ID: {user.id}")
# Get user
user_key = SqlKey(key=(user.id,), storage_cls=UserData)
retrieved_user = await storage.get(session, user_key)
print(f"Retrieved user: {retrieved_user.username}")
# Query users
query_key = SqlKey(key=(), storage_cls=UserData)
condition = SqlCondition(
filters=[UserData.email.like('%@example.com')],
order_func=lambda: UserData.created_at.desc(),
limit=10
)
users = await storage.query(session, query_key, condition)
print(f"Found {len(users)} users")
finally:
await storage.close()
# Run example
asyncio.run(basic_example())import os
from dataclasses import dataclass
from typing import Dict, Any
@dataclass
class DatabaseConfig:
"""Database configuration class"""
host: str = "localhost"
port: int = 3306
username: str = "root"
password: str = "password"
database: str = "test_db"
charset: str = "utf8mb4"
# Connection pool settings
pool_size: int = 10
max_overflow: int = 20
pool_timeout: int = 30
pool_recycle: int = 3600
# SQLAlchemy settings
echo: bool = False
echo_pool: bool = False
def get_async_url(self) -> str:
"""Get async connection URL"""
return f"mysql+aiomysql://{self.username}:{self.password}@{self.host}:{self.port}/{self.database}?charset={self.charset}"
def get_sync_url(self) -> str:
"""Get sync connection URL"""
return f"mysql+pymysql://{self.username}:{self.password}@{self.host}:{self.port}/{self.database}?charset={self.charset}"
def get_engine_kwargs(self) -> Dict[str, Any]:
"""Get engine parameters"""
return {
"echo": self.echo,
"echo_pool": self.echo_pool,
"pool_size": self.pool_size,
"max_overflow": self.max_overflow,
"pool_timeout": self.pool_timeout,
"pool_recycle": self.pool_recycle,
}
@classmethod
def from_env(cls) -> 'DatabaseConfig':
"""Create configuration from environment variables"""
return cls(
host=os.getenv("DB_HOST", "localhost"),
port=int(os.getenv("DB_PORT", "3306")),
username=os.getenv("DB_USER", "root"),
password=os.getenv("DB_PASSWORD", "password"),
database=os.getenv("DB_NAME", "test_db"),
echo=os.getenv("DB_ECHO", "false").lower() == "true",
)# Create database engine and tables
await storage.create_sql_engine()
# Close database connection
await storage.close()# Create database session (recommended to use context manager)
async with storage.create_db_session() as session:
# Execute database operations here
pass
# Create raw session (requires manual management)
session = await storage.create_sql_session()async with storage.create_db_session() as session:
user = UserData(username="test", email="test@example.com")
await storage.add(session, user)
await storage.commit(session)
await storage.refresh(session, user) # Get auto-generated IDasync with storage.create_db_session() as session:
# Get by primary key
user_key = SqlKey(key=(user_id,), storage_cls=UserData)
user = await storage.get(session, user_key)async with storage.create_db_session() as session:
query_key = SqlKey(key=(), storage_cls=UserData)
# Simple query
condition = SqlCondition()
all_users = await storage.query(session, query_key, condition)
# Query with conditions
condition = SqlCondition(
filters=[
UserData.email.like('%@example.com'),
UserData.created_at > datetime(2024, 1, 1)
],
order_func=lambda: UserData.created_at.desc(),
limit=10
)
filtered_users = await storage.query(session, query_key, condition)async with storage.create_db_session() as session:
delete_key = SqlKey(key=(), storage_cls=UserData)
condition = SqlCondition(filters=[UserData.id == user_id])
await storage.delete(session, delete_key, condition)
await storage.commit(session)async with storage.create_db_session() as session:
user_key = SqlKey(key=(user_id,), storage_cls=UserData)
user = await storage.get(session, user_key)
if user:
user.bio = "Updated bio"
user.updated_at = datetime.utcnow()
await storage.commit(session)
await storage.refresh(session, user)async with storage.create_db_session() as session:
try:
# Execute multiple operations
await storage.add(session, user1)
await storage.add(session, user2)
await storage.commit(session)
except Exception as e:
# Auto-rollback (handled by context manager)
print(f"Transaction failed: {e}")
raiseasync with storage.create_db_session() as session:
users = [
UserData(username=f"user_{i}", email=f"user_{i}@example.com")
for i in range(10)
]
for user in users:
await storage.add(session, user)
await storage.commit(session)from sqlalchemy import and_, or_
condition = SqlCondition(
filters=[
and_(
UserData.created_at > datetime(2024, 1, 1),
or_(
UserData.email.like('%@gmail.com'),
UserData.email.like('%@yahoo.com')
)
)
],
order_func=lambda: [UserData.created_at.desc(), UserData.username.asc()],
limit=50
)# Async connection (recommended)
mysql_async_url = "mysql+aiomysql://username:password@host:port/database"
# Sync connection
mysql_sync_url = "mysql+pymysql://username:password@host:port/database"
# Connection with parameters
mysql_url = "mysql+aiomysql://user:pass@localhost/db?charset=utf8mb4&autocommit=true"
# SSL connection
mysql_ssl_url = "mysql+pymysql://user:pass@host/db?ssl_ca=/path/to/ca.pem"# Async connection
postgres_async_url = "postgresql+asyncpg://username:password@host:port/database"
# Sync connection
postgres_sync_url = "postgresql+psycopg2://username:password@host:port/database"
# Connection with parameters
postgres_url = "postgresql+asyncpg://user:pass@localhost/db?ssl=require"# Async connection
sqlite_async_url = "sqlite+aiosqlite:///path/to/database.db"
# Sync connection
sqlite_sync_url = "sqlite:///path/to/database.db"
# In-memory database
sqlite_memory_url = "sqlite:///:memory:"from dataclasses import dataclass
from datetime import datetime
from sqlalchemy import Column, Integer, String, DateTime, Text, Boolean, ForeignKey
from sqlalchemy.orm import relationship
from trpc_agent_sdk.storage import StorageData
@dataclass
class UserData(StorageData):
"""User data model"""
__tablename__ = 'users'
id: int = Column(Integer, primary_key=True, autoincrement=True)
username: str = Column(String(50), unique=True, nullable=False, index=True)
email: str = Column(String(100), nullable=False, index=True)
full_name: str = Column(String(100), nullable=True)
bio: str = Column(Text, nullable=True)
is_active: bool = Column(Boolean, default=True)
created_at: datetime = Column(DateTime, default=datetime.utcnow, index=True)
updated_at: datetime = Column(DateTime, default=datetime.utcnow, onupdate=datetime.utcnow)@dataclass
class PostData(StorageData):
"""Post data model"""
__tablename__ = 'posts'
id: int = Column(Integer, primary_key=True, autoincrement=True)
title: str = Column(String(200), nullable=False)
content: str = Column(Text, nullable=False)
user_id: int = Column(Integer, ForeignKey('users.id'), nullable=False)
created_at: datetime = Column(DateTime, default=datetime.utcnow)
# Relationship (optional)
# user = relationship("UserData", back_populates="posts")
@dataclass
class TagData(StorageData):
"""Tag data model"""
__tablename__ = 'tags'
id: int = Column(Integer, primary_key=True, autoincrement=True)
name: str = Column(String(50), unique=True, nullable=False)
description: str = Column(String(200), nullable=True)
created_at: datetime = Column(DateTime, default=datetime.utcnow)@dataclass
class BaseModel(StorageData):
"""Base model class"""
__abstract__ = True # Abstract base class, will not create a table
id: int = Column(Integer, primary_key=True, autoincrement=True)
created_at: datetime = Column(DateTime, default=datetime.utcnow, nullable=False)
updated_at: datetime = Column(DateTime, default=datetime.utcnow, onupdate=datetime.utcnow, nullable=False)
@dataclass
class ProductData(BaseModel):
"""Product data model"""
__tablename__ = 'products'
name: str = Column(String(100), nullable=False, index=True)
price: int = Column(Integer, nullable=False) # Stored in cents
description: str = Column(Text, nullable=True)
is_available: bool = Column(Boolean, default=True, index=True)
category_id: int = Column(Integer, ForeignKey('categories.id'), nullable=True)import asyncio
from datetime import datetime
from trpc_agent_sdk.storage import SqlStorage, SqlKey, SqlCondition
class UserService:
"""User service class example"""
def __init__(self, db_url: str):
self.storage = SqlStorage(
is_async=True,
db_url=db_url,
echo=True,
pool_size=10,
max_overflow=20
)
async def initialize(self):
"""Initialize database"""
await self.storage.create_sql_engine()
async def create_user(self, username: str, email: str, full_name: str = None) -> int:
"""Create user"""
async with self.storage.create_db_session() as session:
user = UserData(
username=username,
email=email,
full_name=full_name,
created_at=datetime.utcnow()
)
await self.storage.add(session, user)
await self.storage.commit(session)
await self.storage.refresh(session, user)
return user.id
async def get_user_by_id(self, user_id: int) -> UserData:
"""Get user by ID"""
async with self.storage.create_db_session() as session:
user_key = SqlKey(key=(user_id,), storage_cls=UserData)
return await self.storage.get(session, user_key)
async def find_users_by_email_domain(self, domain: str, limit: int = 10) -> list:
"""Find users by email domain"""
async with self.storage.create_db_session() as session:
query_key = SqlKey(key=(), storage_cls=UserData)
condition = SqlCondition(
filters=[UserData.email.like(f'%@{domain}')],
order_func=lambda: UserData.created_at.desc(),
limit=limit
)
return await self.storage.query(session, query_key, condition)
async def update_user_bio(self, user_id: int, bio: str) -> bool:
"""Update user bio"""
async with self.storage.create_db_session() as session:
user_key = SqlKey(key=(user_id,), storage_cls=UserData)
user = await self.storage.get(session, user_key)
if user:
user.bio = bio
user.updated_at = datetime.utcnow()
await self.storage.commit(session)
return True
return False
async def delete_inactive_users(self, days: int = 30) -> int:
"""Delete inactive users"""
from datetime import timedelta
cutoff_date = datetime.utcnow() - timedelta(days=days)
async with self.storage.create_db_session() as session:
delete_key = SqlKey(key=(), storage_cls=UserData)
condition = SqlCondition(
filters=[
UserData.is_active == False,
UserData.updated_at < cutoff_date
]
)
# Query users to be deleted first
users_to_delete = await self.storage.query(session, delete_key, condition)
count = len(users_to_delete)
# Execute deletion
await self.storage.delete(session, delete_key, condition)
await self.storage.commit(session)
return count
async def close(self):
"""Close database connection"""
await self.storage.close()
# Usage example
async def main():
service = UserService("mysql+aiomysql://root:password@localhost/test_db")
try:
await service.initialize()
# Create user
user_id = await service.create_user("john_doe", "john@example.com", "John Doe")
print(f"Created user with ID: {user_id}")
# Get user
user = await service.get_user_by_id(user_id)
print(f"Retrieved user: {user.username}")
# Find users
users = await service.find_users_by_email_domain("example.com")
print(f"Found {len(users)} users with example.com email")
# Update user
success = await service.update_user_bio(user_id, "Updated bio")
print(f"Update successful: {success}")
finally:
await service.close()
if __name__ == "__main__":
asyncio.run(main())from sqlalchemy.exc import IntegrityError, SQLAlchemyError
async def safe_create_user(storage, username: str, email: str):
"""Safe user creation example"""
async with storage.create_db_session() as session:
try:
user = UserData(username=username, email=email)
await storage.add(session, user)
await storage.commit(session)
await storage.refresh(session, user)
return user.id
except IntegrityError as e:
print(f"Data integrity error (possibly duplicate data): {e}")
return None
except SQLAlchemyError as e:
print(f"Database error: {e}")
return None
except Exception as e:
print(f"Unknown error: {e}")
return Noneclass DatabaseManager:
"""Database manager"""
def __init__(self, db_url: str):
self.storage = None
self.db_url = db_url
async def __aenter__(self):
self.storage = SqlStorage(is_async=True, db_url=self.db_url)
await self.storage.create_sql_engine()
return self.storage
async def __aexit__(self, exc_type, exc_val, exc_tb):
if self.storage:
await self.storage.close()
# Usage
async def example_with_manager():
async with DatabaseManager("mysql+aiomysql://...") as storage:
async with storage.create_db_session() as session:
# Execute database operations
passstorage = SqlStorage(
is_async=True,
db_url=db_url,
pool_size=20, # Connection pool size
max_overflow=30, # Maximum overflow connections
pool_timeout=30, # Connection acquisition timeout
pool_recycle=3600, # Connection recycle time (seconds)
pool_pre_ping=True, # Ping connections before use from the pool
)async def batch_create_users(storage, users_data: list):
"""Batch create users"""
async with storage.create_db_session() as session:
try:
for user_data in users_data:
user = UserData(**user_data)
await storage.add(session, user)
await storage.commit(session)
print(f"Successfully created {len(users_data)} users")
except Exception as e:
print(f"Batch operation failed: {e}")
# Session will auto-rollback# Use indexed fields for queries
condition = SqlCondition(
filters=[
UserData.username == "john_doe", # username has index
UserData.is_active == True # is_active has index
]
)
# Limit query result count
condition = SqlCondition(
filters=[UserData.created_at > datetime(2024, 1, 1)],
order_func=lambda: UserData.created_at.desc(),
limit=100 # Limit result count
)-
Connection Error
# Error: Can't connect to MySQL server # Solution: Check database service status and connection parameters # Test connection try: await storage.create_sql_engine() print("Database connection successful") except Exception as e: print(f"Connection failed: {e}")
-
Table Not Found Error
# Error: Table 'database.table_name' doesn't exist # Solution: Ensure create_sql_engine() has been called await storage.create_sql_engine() # This creates all tables
-
Data Integrity Error
# Error: Duplicate entry 'value' for key 'column_name' # Solution: Check unique constraint fields try: await storage.add(session, user) await storage.commit(session) except IntegrityError: print("Record already exists or violates a constraint")
# Enable verbose logging
import logging
logging.basicConfig(level=logging.DEBUG)
storage = SqlStorage(
is_async=True,
db_url=db_url,
echo=True, # Display SQL statements
echo_pool=True, # Display connection pool info
)# Install dependencies
pip install sqlalchemy aiomysql
# Set environment variables
export DB_URL="mysql+aiomysql://root:password@localhost/test_db"
# Use a different database
export DB_URL="postgresql+asyncpg://user:pass@localhost/test_db"- psycopg2
Required package: pip install psycopg2-binary url: "postgresql+psycopg2://username:password@localhost:5432/mydb"
- asyncpg
Required package: pip install asyncpg url: "postgresql+asyncpg://username:password@localhost:5432/mydb"
- pg8000
Required package: pip install pg8000 url: "postgresql+pg8000://username:password@localhost:5432/mydb"
- PyMySQL
Required package: pip install PyMySQL url: "mysql+pymysql://username:password@localhost:3306/mydb"
- mysqlclient
Required package: pip install mysqlclient url: "mysql+mysqldb://username:password@localhost:3306/mydb"
- mysqlconnector
Required package: pip install mysql-connector-python url: "mysql+mysqlconnector://username:password@localhost:3306/mydb"
- aiomysql
Required package: pip install aiomysql url: "mysql+aiomysql://username:password@localhost:3306/mydb"
- sqlite3
# Built-in, no installation required url: "sqlite:///./test.db" - aiosqlite
Required package: pip install aiosqlite url: "sqlite+aiosqlite:///./test.db"
- cx_Oracle
Required package: pip install cx_Oracle url: "oracle+cx_oracle://username:password@localhost:1521/xe"
- oracledb
Required package: pip install oracledb url: "oracle+oracledb://username:password@localhost:1521/xe"
- pyodbc
Required package: pip install pyodbc url: "mssql+pyodbc://username:password@server:1433/database?driver=ODBC+Driver+17+for+SQL+Server"
- pymssql
Required package: pip install pymssql url: "mssql+pymssql://username:password@server:1433/database"