Skip to content

Latest commit

 

History

History
164 lines (130 loc) · 5.11 KB

File metadata and controls

164 lines (130 loc) · 5.11 KB

SQLAlchemy ORM

1. 建表

  • 示例代码:
from fastapi import FastAPI
from sqlalchemy import func, String, Float, DateTime
from sqlalchemy.ext.asyncio import create_async_engine
from sqlalchemy.orm import DeclarativeBase, mapped_column, Mapped
from datetime import datetime
app = FastAPI()

# 1.创造异步引擎
ASYNC_DATABASE_URL = "mysql+aiomysql://root:123456@localhost:3306/fastapi_test?charset=utf8"
async_engine = create_async_engine(
    ASYNC_DATABASE_URL,
    echo = True,
    pool_size = 10,
    max_overflow = 20
)

# 2. 定义模型类 : 基类 + 表对应的模型类
class Base(DeclarativeBase):
    create_time : Mapped[datetime] = mapped_column(DateTime,insert_default=func.now(),default=func.now,comment="创建时间")
    update_time : Mapped[datetime] = mapped_column(DateTime,insert_default=func.now(),default=func.now,onupdate=func.now(),comment="创建时间")

class Book(Base):
    __tablename__ = "book"
    id: Mapped[int] = mapped_column(primary_key=True,comment="书籍id")
    bookname: Mapped[str] = mapped_column(String(255),comment="书名")
    author: Mapped[str] = mapped_column(String(255), comment="作者")
    price: Mapped[float] = mapped_column(Float,comment="价格")
    publisher: Mapped[str] = mapped_column(String(255),comment="出版社")

async def create_tables():
    # 获取异步引擎,创建事务 - 建表
    async with async_engine.begin() as conn:
        await conn.run_sync(Base.metadata.create_all)


@app.on_event("startup")
async def startup():
    await create_tables()


@app.get("/")
async def root():
    return {"message": "Hello World"}

2. 在路由中使用ORM

  • 示例代码:
from fastapi import FastAPI, Depends
from sqlalchemy import func, String, Float, DateTime, select
from sqlalchemy.ext.asyncio import create_async_engine, async_session, async_sessionmaker, AsyncSession
from sqlalchemy.orm import DeclarativeBase, mapped_column, Mapped
from datetime import datetime
app = FastAPI()

# 1.创造异步引擎
ASYNC_DATABASE_URL = "mysql+aiomysql://root:123456@localhost:3306/fastapi_test?charset=utf8"
async_engine = create_async_engine(
    ASYNC_DATABASE_URL,
    echo = True,
    pool_size = 10,
    max_overflow = 20
)

# 2. 定义模型类 : 基类 + 表对应的模型类
class Base(DeclarativeBase):
    create_time : Mapped[datetime] = mapped_column(DateTime,insert_default=func.now(),default=func.now,comment="创建时间")
    update_time : Mapped[datetime] = mapped_column(DateTime,insert_default=func.now(),default=func.now,onupdate=func.now(),comment="创建时间")

class Book(Base):
    __tablename__ = "book"
    id: Mapped[int] = mapped_column(primary_key=True,comment="书籍id")
    bookname: Mapped[str] = mapped_column(String(255),comment="书名")
    author: Mapped[str] = mapped_column(String(255), comment="作者")
    price: Mapped[float] = mapped_column(Float,comment="价格")
    publisher: Mapped[str] = mapped_column(String(255),comment="出版社")



AsyncSessionLocal = async_sessionmaker(
    bind=async_engine,
    class_ = AsyncSession,
    expire_on_commit = False #提交后会话不过期,不会重新查询数据库
)

async def get_database():
    async with AsyncSessionLocal() as session:
        try:
            yield session
            await session.commit()
        except Exception:
            await session.rollback()
            raise
        finally:
            await session.close()

@app.get("/book/books")
async def get_book_list(db:AsyncSession = Depends(get_database)):
    result = await db.execute(select(Book))
    book = result.scalars().all()
    return book

3. 查询数据

  • 示例代码:
@app.get("/book/books")
async def get_book_list(db:AsyncSession = Depends(get_database)):
    # result = await db.execute(select(Book)) # 查询 -> 返回一个 ORM 对象
    # book = result.scalars().all()  # 获取所有
    # book = result.scalars().first()  # 获取第一条
    book = await db.get(Book,1) # 获取单条数据 -> 根据主键
    return book

3.1 条件查询

注意事项:静态路由一定要写在动态路由上面。

  • 示例代码:
@app.get("/book/search_price")
async def get_search_price(db:AsyncSession = Depends(get_database)):
    result = await db.execute(select(Book).where(Book.price > 40)) # 查询 -> 返回一个 ORM 对象
    # book = result.scalars().all()  # 获取所有
    book = result.scalars().all()
    return book


@app.get("/book/{book_id}")
async def get_book_list(book_id:int,db:AsyncSession = Depends(get_database)):
    result = await db.execute(select(Book).where(Book.id == book_id)) # 查询 -> 返回一个 ORM 对象
    # book = result.scalars().all()  # 获取所有
    book = result.scalar_one_or_none()
    return book

3.2 其他条件查询

  1. 比较判断: ==;>;<;>=;<=

  2. 模糊查询: like()

  3. 与非查询: &;|;~

  4. 包含查询: in_()

  • 示例代码:
@app.get("/book/search_author")
async def get_search_author(db:AsyncSession = Depends(get_database)):
    result = await db.execute(select(Book).where(Book.author.like("曹%")))
    book = result.scalars().all()
    return book