- 示例代码:
from fastapi import FastAPI
from sqlalchemy import func, String, Float, DateTime
from sqlalchemy.ext.asyncio import create_async_engine
from sqlalchemy.orm import DeclarativeBase, mapped_column, Mapped
from datetime import datetime
app = FastAPI()
# 1.创造异步引擎
ASYNC_DATABASE_URL = "mysql+aiomysql://root:123456@localhost:3306/fastapi_test?charset=utf8"
async_engine = create_async_engine(
ASYNC_DATABASE_URL,
echo = True,
pool_size = 10,
max_overflow = 20
)
# 2. 定义模型类 : 基类 + 表对应的模型类
class Base(DeclarativeBase):
create_time : Mapped[datetime] = mapped_column(DateTime,insert_default=func.now(),default=func.now,comment="创建时间")
update_time : Mapped[datetime] = mapped_column(DateTime,insert_default=func.now(),default=func.now,onupdate=func.now(),comment="创建时间")
class Book(Base):
__tablename__ = "book"
id: Mapped[int] = mapped_column(primary_key=True,comment="书籍id")
bookname: Mapped[str] = mapped_column(String(255),comment="书名")
author: Mapped[str] = mapped_column(String(255), comment="作者")
price: Mapped[float] = mapped_column(Float,comment="价格")
publisher: Mapped[str] = mapped_column(String(255),comment="出版社")
async def create_tables():
# 获取异步引擎,创建事务 - 建表
async with async_engine.begin() as conn:
await conn.run_sync(Base.metadata.create_all)
@app.on_event("startup")
async def startup():
await create_tables()
@app.get("/")
async def root():
return {"message": "Hello World"}- 示例代码:
from fastapi import FastAPI, Depends
from sqlalchemy import func, String, Float, DateTime, select
from sqlalchemy.ext.asyncio import create_async_engine, async_session, async_sessionmaker, AsyncSession
from sqlalchemy.orm import DeclarativeBase, mapped_column, Mapped
from datetime import datetime
app = FastAPI()
# 1.创造异步引擎
ASYNC_DATABASE_URL = "mysql+aiomysql://root:123456@localhost:3306/fastapi_test?charset=utf8"
async_engine = create_async_engine(
ASYNC_DATABASE_URL,
echo = True,
pool_size = 10,
max_overflow = 20
)
# 2. 定义模型类 : 基类 + 表对应的模型类
class Base(DeclarativeBase):
create_time : Mapped[datetime] = mapped_column(DateTime,insert_default=func.now(),default=func.now,comment="创建时间")
update_time : Mapped[datetime] = mapped_column(DateTime,insert_default=func.now(),default=func.now,onupdate=func.now(),comment="创建时间")
class Book(Base):
__tablename__ = "book"
id: Mapped[int] = mapped_column(primary_key=True,comment="书籍id")
bookname: Mapped[str] = mapped_column(String(255),comment="书名")
author: Mapped[str] = mapped_column(String(255), comment="作者")
price: Mapped[float] = mapped_column(Float,comment="价格")
publisher: Mapped[str] = mapped_column(String(255),comment="出版社")
AsyncSessionLocal = async_sessionmaker(
bind=async_engine,
class_ = AsyncSession,
expire_on_commit = False #提交后会话不过期,不会重新查询数据库
)
async def get_database():
async with AsyncSessionLocal() as session:
try:
yield session
await session.commit()
except Exception:
await session.rollback()
raise
finally:
await session.close()
@app.get("/book/books")
async def get_book_list(db:AsyncSession = Depends(get_database)):
result = await db.execute(select(Book))
book = result.scalars().all()
return book- 示例代码:
@app.get("/book/books")
async def get_book_list(db:AsyncSession = Depends(get_database)):
# result = await db.execute(select(Book)) # 查询 -> 返回一个 ORM 对象
# book = result.scalars().all() # 获取所有
# book = result.scalars().first() # 获取第一条
book = await db.get(Book,1) # 获取单条数据 -> 根据主键
return book注意事项:静态路由一定要写在动态路由上面。
- 示例代码:
@app.get("/book/search_price")
async def get_search_price(db:AsyncSession = Depends(get_database)):
result = await db.execute(select(Book).where(Book.price > 40)) # 查询 -> 返回一个 ORM 对象
# book = result.scalars().all() # 获取所有
book = result.scalars().all()
return book
@app.get("/book/{book_id}")
async def get_book_list(book_id:int,db:AsyncSession = Depends(get_database)):
result = await db.execute(select(Book).where(Book.id == book_id)) # 查询 -> 返回一个 ORM 对象
# book = result.scalars().all() # 获取所有
book = result.scalar_one_or_none()
return book
-
比较判断: ==;>;<;>=;<=
-
模糊查询: like()
-
与非查询: &;|;~
-
包含查询: in_()
- 示例代码:
@app.get("/book/search_author")
async def get_search_author(db:AsyncSession = Depends(get_database)):
result = await db.execute(select(Book).where(Book.author.like("曹%")))
book = result.scalars().all()
return book