简体中文 繁體中文 English 日本語 Deutsch 한국 사람 بالعربية TÜRKÇE português คนไทย Français

站内搜索

搜索

活动公告

11-27 10:00
11-02 12:46
10-23 09:32
通知:本站资源由网友上传分享,如有违规等问题请到版务模块进行投诉,将及时处理!
10-23 09:31
10-23 09:28

FastAPI实战项目案例分享从API设计到系统优化涵盖认证授权数据库集成缓存策略异步处理等关键技术点助你掌握现代Web开发精髓

3万

主题

616

科技点

3万

积分

大区版主

碾压王

积分
31959

三倍冰淇淋无人之境【一阶】财Doro小樱(小丑装)立华奏以外的星空【二阶】

发表于 2025-10-6 21:00:30 | 显示全部楼层 |阅读模式 [标记阅至此楼]

马上注册,结交更多好友,享用更多功能,让你轻松玩转社区。

您需要 登录 才可以下载或查看,没有账号?立即注册

x
引言

FastAPI是近年来备受瞩目的Python Web框架,它凭借出色的性能、直观的API设计和现代化的开发体验,迅速成为构建高性能API的首选框架之一。本文将通过一个完整的实战项目,深入探讨FastAPI的各项关键技术,帮助开发者从零开始构建一个生产级别的Web应用。

FastAPI的核心优势在于:

• 基于Starlette和Pydantic,提供高性能的异步处理能力
• 自动生成交互式API文档(Swagger UI)
• 内置数据验证和序列化功能
• 支持依赖注入系统,便于测试和模块化开发
• 类型提示支持,提供更好的IDE体验和代码质量

在本文中,我们将通过构建一个完整的博客系统来展示FastAPI的强大功能,涵盖从基础API设计到高级系统优化的全过程。

项目结构设计

良好的项目结构是成功的一半。对于FastAPI项目,推荐采用以下结构:
  1. blog_system/
  2. ├── app/
  3. │   ├── __init__.py
  4. │   ├── main.py           # FastAPI应用入口
  5. │   ├── core/             # 核心配置
  6. │   │   ├── __init__.py
  7. │   │   ├── config.py     # 配置管理
  8. │   │   └── security.py   # 安全相关功能
  9. │   ├── api/              # API路由
  10. │   │   ├── __init__.py
  11. │   │   ├── v1/
  12. │   │   │   ├── __init__.py
  13. │   │   │   ├── endpoints/
  14. │   │   │   │   ├── __init__.py
  15. │   │   │   │   ├── posts.py
  16. │   │   │   │   ├── users.py
  17. │   │   │   │   └── comments.py
  18. │   │   │   └── api.py    # API路由注册
  19. │   ├── models/           # 数据模型
  20. │   │   ├── __init__.py
  21. │   │   ├── user.py
  22. │   │   ├── post.py
  23. │   │   └── comment.py
  24. │   ├── schemas/          # Pydantic模型
  25. │   │   ├── __init__.py
  26. │   │   ├── user.py
  27. │   │   ├── post.py
  28. │   │   └── comment.py
  29. │   ├── crud/             # 数据库操作
  30. │   │   ├── __init__.py
  31. │   │   ├── base.py
  32. │   │   ├── user.py
  33. │   │   ├── post.py
  34. │   │   └── comment.py
  35. │   ├── db/               # 数据库配置
  36. │   │   ├── __init__.py
  37. │   │   ├── session.py    # 数据库会话
  38. │   │   └── base_class.py # 基础模型类
  39. │   └── utils/            # 工具函数
  40. │       ├── __init__.py
  41. │       └── helpers.py
  42. ├── tests/                # 测试目录
  43. │   ├── __init__.py
  44. │   ├── conftest.py
  45. │   ├── test_api.py
  46. │   └── test_crud.py
  47. ├── alembic/              # 数据库迁移
  48. ├── requirements.txt      # 项目依赖
  49. ├── .env                  # 环境变量
  50. └── Dockerfile            # Docker配置
复制代码

这种结构遵循了领域驱动设计(DDD)的原则,将不同功能的代码分门别类地组织在一起,便于维护和扩展。

API设计与实现

RESTful API设计原则

在FastAPI中设计API时,我们遵循RESTful原则:

1. 使用HTTP动词表示操作类型(GET、POST、PUT、DELETE等)
2. 使用名词表示资源(如/users、/posts)
3. 使用嵌套资源表示关系(如/posts/{post_id}/comments)
4. 使用HTTP状态码表示操作结果
5. 支持内容协商(JSON、XML等)

基本路由实现

让我们从实现博客系统的基本路由开始:
  1. # app/main.py
  2. from fastapi import FastAPI
  3. from app.api.v1.api import api_router
  4. from app.core.config import settings
  5. from fastapi.middleware.cors import CORSMiddleware
  6. app = FastAPI(
  7.     title=settings.PROJECT_NAME,
  8.     openapi_url=f"{settings.API_V1_STR}/openapi.json"
  9. )
  10. # 设置CORS
  11. if settings.BACKEND_CORS_ORIGINS:
  12.     app.add_middleware(
  13.         CORSMiddleware,
  14.         allow_origins=[str(origin) for origin in settings.BACKEND_CORS_ORIGINS],
  15.         allow_credentials=True,
  16.         allow_methods=["*"],
  17.         allow_headers=["*"],
  18.     )
  19. app.include_router(api_router, prefix=settings.API_V1_STR)
  20. @app.get("/")
  21. async def root():
  22.     return {"message": "Welcome to Blog System API"}
复制代码
  1. # app/api/v1/api.py
  2. from fastapi import APIRouter
  3. from app.api.v1.endpoints import posts, users, comments
  4. api_router = APIRouter()
  5. api_router.include_router(users.router, prefix="/users", tags=["users"])
  6. api_router.include_router(posts.router, prefix="/posts", tags=["posts"])
  7. api_router.include_router(comments.router, prefix="/comments", tags=["comments"])
复制代码
  1. # app/api/v1/endpoints/posts.py
  2. from typing import List, Optional
  3. from fastapi import APIRouter, Depends, HTTPException, Query
  4. from sqlalchemy.orm import Session
  5. from app import crud, models, schemas
  6. from app.api import deps
  7. router = APIRouter()
  8. @router.get("/", response_model=List[schemas.Post])
  9. def read_posts(
  10.     db: Session = Depends(deps.get_db),
  11.     skip: int = 0,
  12.     limit: int = 100,
  13.     search: Optional[str] = Query(None, min_length=3, description="Search in post title and content")
  14. ):
  15.     """
  16.     Retrieve posts.
  17.     """
  18.     if search:
  19.         posts = crud.post.search(db, search=search, skip=skip, limit=limit)
  20.     else:
  21.         posts = crud.post.get_multi(db, skip=skip, limit=limit)
  22.     return posts
  23. @router.post("/", response_model=schemas.Post)
  24. def create_post(
  25.     *,
  26.     db: Session = Depends(deps.get_db),
  27.     post_in: schemas.PostCreate,
  28.     current_user: models.User = Depends(deps.get_current_active_user),
  29. ):
  30.     """
  31.     Create new post.
  32.     """
  33.     post = crud.post.create_with_owner(db=db, obj_in=post_in, owner_id=current_user.id)
  34.     return post
  35. @router.get("/{post_id}", response_model=schemas.Post)
  36. def read_post(
  37.     *,
  38.     db: Session = Depends(deps.get_db),
  39.     post_id: int,
  40. ):
  41.     """
  42.     Get post by ID.
  43.     """
  44.     post = crud.post.get(db=db, id=post_id)
  45.     if not post:
  46.         raise HTTPException(status_code=404, detail="Post not found")
  47.     return post
  48. @router.put("/{post_id}", response_model=schemas.Post)
  49. def update_post(
  50.     *,
  51.     db: Session = Depends(deps.get_db),
  52.     post_id: int,
  53.     post_in: schemas.PostUpdate,
  54.     current_user: models.User = Depends(deps.get_current_active_user),
  55. ):
  56.     """
  57.     Update a post.
  58.     """
  59.     post = crud.post.get(db=db, id=post_id)
  60.     if not post:
  61.         raise HTTPException(status_code=404, detail="Post not found")
  62.     if post.owner_id != current_user.id:
  63.         raise HTTPException(status_code=403, detail="Not enough permissions")
  64.     post = crud.post.update(db=db, db_obj=post, obj_in=post_in)
  65.     return post
  66. @router.delete("/{post_id}", response_model=schemas.Post)
  67. def delete_post(
  68.     *,
  69.     db: Session = Depends(deps.get_db),
  70.     post_id: int,
  71.     current_user: models.User = Depends(deps.get_current_active_user),
  72. ):
  73.     """
  74.     Delete a post.
  75.     """
  76.     post = crud.post.get(db=db, id=post_id)
  77.     if not post:
  78.         raise HTTPException(status_code=404, detail="Post not found")
  79.     if post.owner_id != current_user.id:
  80.         raise HTTPException(status_code=403, detail="Not enough permissions")
  81.     post = crud.post.remove(db=db, id=post_id)
  82.     return post
复制代码

数据模型与Pydantic Schema

在FastAPI中,我们使用SQLAlchemy定义数据库模型,使用Pydantic定义请求和响应的数据结构:
  1. # app/models/post.py
  2. from sqlalchemy import Boolean, Column, Integer, String, Text, ForeignKey, DateTime
  3. from sqlalchemy.orm import relationship
  4. from sqlalchemy.sql import func
  5. from app.db.base_class import Base
  6. class Post(Base):
  7.     id = Column(Integer, primary_key=True, index=True)
  8.     title = Column(String(256), nullable=False, index=True)
  9.     content = Column(Text, nullable=False)
  10.     published = Column(Boolean, default=False)
  11.     created_at = Column(DateTime(timezone=True), server_default=func.now())
  12.     updated_at = Column(DateTime(timezone=True), onupdate=func.now())
  13.     owner_id = Column(Integer, ForeignKey("user.id"), nullable=False)
  14.    
  15.     owner = relationship("User", back_populates="posts")
  16.     comments = relationship("Comment", back_populates="post", cascade="all, delete-orphan")
复制代码
  1. # app/schemas/post.py
  2. from typing import Optional, List
  3. from datetime import datetime
  4. from pydantic import BaseModel
  5. # Base schema with common attributes
  6. class PostBase(BaseModel):
  7.     title: str
  8.     content: str
  9.     published: bool = False
  10. # Schema for creating a new post
  11. class PostCreate(PostBase):
  12.     pass
  13. # Schema for updating a post
  14. class PostUpdate(PostBase):
  15.     title: Optional[str] = None
  16.     content: Optional[str] = None
  17.     published: Optional[bool] = None
  18. # Schema for returning post data
  19. class Post(PostBase):
  20.     id: int
  21.     created_at: datetime
  22.     updated_at: Optional[datetime] = None
  23.     owner_id: int
  24.    
  25.     class Config:
  26.         orm_mode = True
复制代码

认证与授权

JWT认证实现

在FastAPI中实现JWT认证非常简单,我们可以使用OAuth2密码流和JWT令牌:
  1. # app/core/security.py
  2. from datetime import datetime, timedelta
  3. from typing import Any, Union, Optional
  4. from jose import jwt
  5. from passlib.context import CryptContext
  6. from app.core.config import settings
  7. pwd_context = CryptContext(schemes=["bcrypt"], deprecated="auto")
  8. def create_access_token(
  9.     subject: Union[str, Any], expires_delta: Optional[timedelta] = None
  10. ) -> str:
  11.     """
  12.     创建JWT访问令牌
  13.     """
  14.     if expires_delta:
  15.         expire = datetime.utcnow() + expires_delta
  16.     else:
  17.         expire = datetime.utcnow() + timedelta(
  18.             minutes=settings.ACCESS_TOKEN_EXPIRE_MINUTES
  19.         )
  20.     to_encode = {"exp": expire, "sub": str(subject)}
  21.     encoded_jwt = jwt.encode(
  22.         to_encode, settings.SECRET_KEY, algorithm=settings.ALGORITHM
  23.     )
  24.     return encoded_jwt
  25. def verify_password(plain_password: str, hashed_password: str) -> bool:
  26.     """
  27.     验证密码
  28.     """
  29.     return pwd_context.verify(plain_password, hashed_password)
  30. def get_password_hash(password: str) -> str:
  31.     """
  32.     获取密码哈希值
  33.     """
  34.     return pwd_context.hash(password)
复制代码
  1. # app/api/v1/endpoints/users.py
  2. from datetime import timedelta
  3. from typing import Any
  4. from fastapi import APIRouter, Depends, HTTPException
  5. from fastapi.security import OAuth2PasswordRequestForm
  6. from sqlalchemy.orm import Session
  7. from app import crud, models, schemas
  8. from app.api import deps
  9. from app.core import security
  10. from app.core.config import settings
  11. router = APIRouter()
  12. @router.post("/login/access-token", response_model=schemas.Token)
  13. def login_access_token(
  14.     db: Session = Depends(deps.get_db),
  15.     form_data: OAuth2PasswordRequestForm = Depends()
  16. ) -> Any:
  17.     """
  18.     OAuth2 compatible token login, get an access token for future requests
  19.     """
  20.     user = crud.user.authenticate(
  21.         db, email=form_data.username, password=form_data.password
  22.     )
  23.     if not user:
  24.         raise HTTPException(status_code=400, detail="Incorrect email or password")
  25.     elif not crud.user.is_active(user):
  26.         raise HTTPException(status_code=400, detail="Inactive user")
  27.     access_token_expires = timedelta(minutes=settings.ACCESS_TOKEN_EXPIRE_MINUTES)
  28.     return {
  29.         "access_token": security.create_access_token(
  30.             user.id, expires_delta=access_token_expires
  31.         ),
  32.         "token_type": "bearer",
  33.     }
复制代码

权限控制

FastAPI的依赖注入系统使得实现权限控制变得非常简单:
  1. # app/api/deps.py
  2. from fastapi import Depends, HTTPException, status
  3. from fastapi.security import OAuth2PasswordBearer
  4. from jose import jwt
  5. from pydantic import ValidationError
  6. from sqlalchemy.orm import Session
  7. from app import crud, models, schemas
  8. from app.core import security
  9. from app.core.config import settings
  10. from app.db.session import SessionLocal
  11. oauth2_scheme = OAuth2PasswordBearer(tokenUrl=f"{settings.API_V1_STR}/login/access-token")
  12. def get_db() -> Session:
  13.     """
  14.     获取数据库会话
  15.     """
  16.     db = SessionLocal()
  17.     try:
  18.         yield db
  19.     finally:
  20.         db.close()
  21. def get_current_user(
  22.     db: Session = Depends(get_db), token: str = Depends(oauth2_scheme)
  23. ) -> models.User:
  24.     """
  25.     获取当前用户
  26.     """
  27.     try:
  28.         payload = jwt.decode(
  29.             token, settings.SECRET_KEY, algorithms=[settings.ALGORITHM]
  30.         )
  31.         token_data = schemas.TokenPayload(**payload)
  32.     except (jwt.JWTError, ValidationError):
  33.         raise HTTPException(
  34.             status_code=status.HTTP_403_FORBIDDEN,
  35.             detail="Could not validate credentials",
  36.         )
  37.     user = crud.user.get(db, id=token_data.sub)
  38.     if not user:
  39.         raise HTTPException(status_code=404, detail="User not found")
  40.     return user
  41. def get_current_active_user(
  42.     current_user: models.User = Depends(get_current_user),
  43. ) -> models.User:
  44.     """
  45.     获取当前活跃用户
  46.     """
  47.     if not crud.user.is_active(current_user):
  48.         raise HTTPException(status_code=400, detail="Inactive user")
  49.     return current_user
  50. def get_current_active_superuser(
  51.     current_user: models.User = Depends(get_current_user),
  52. ) -> models.User:
  53.     """
  54.     获取当前活跃超级用户
  55.     """
  56.     if not crud.user.is_superuser(current_user):
  57.         raise HTTPException(
  58.             status_code=400, detail="The user doesn't have enough privileges"
  59.         )
  60.     return current_user
复制代码

使用这些依赖项,我们可以轻松地保护API端点:
  1. # app/api/v1/endpoints/users.py
  2. @router.get("/me", response_model=schemas.User)
  3. def read_user_me(
  4.     current_user: models.User = Depends(deps.get_current_active_user),
  5. ):
  6.     """
  7.     获取当前用户信息
  8.     """
  9.     return current_user
  10. @router.post("/", response_model=schemas.User)
  11. def create_user(
  12.     *,
  13.     db: Session = Depends(deps.get_db),
  14.     user_in: schemas.UserCreate,
  15.     current_user: models.User = Depends(deps.get_current_active_superuser),
  16. ):
  17.     """
  18.     创建新用户(仅限超级用户)
  19.     """
  20.     user = crud.user.get_by_email(db, email=user_in.email)
  21.     if user:
  22.         raise HTTPException(
  23.             status_code=400,
  24.             detail="The user with this email already exists in the system.",
  25.         )
  26.     user = crud.user.create(db, obj_in=user_in)
  27.     return user
复制代码

数据库集成

SQLAlchemy配置

FastAPI与SQLAlchemy的集成非常顺畅,我们可以使用异步SQLAlchemy来提高性能:
  1. # app/db/session.py
  2. from sqlalchemy import create_engine
  3. from sqlalchemy.ext.declarative import declarative_base
  4. from sqlalchemy.orm import sessionmaker
  5. from app.core.config import settings
  6. # 创建同步SQLAlchemy引擎
  7. engine = create_engine(settings.SQLALCHEMY_DATABASE_URI, pool_pre_ping=True)
  8. SessionLocal = sessionmaker(autocommit=False, autoflush=False, bind=engine)
  9. # 创建异步SQLAlchemy引擎
  10. from sqlalchemy.ext.asyncio import AsyncSession, create_async_engine
  11. async_engine = create_async_engine(settings.SQLALCHEMY_ASYNC_DATABASE_URI)
  12. AsyncSessionLocal = sessionmaker(
  13.     async_engine, class_=AsyncSession, expire_on_commit=False
  14. )
  15. # 基础模型类
  16. Base = declarative_base()
  17. # 获取数据库会话的依赖项
  18. def get_db() -> Session:
  19.     db = SessionLocal()
  20.     try:
  21.         yield db
  22.     finally:
  23.         db.close()
  24. # 获取异步数据库会话的依赖项
  25. async def get_async_db() -> AsyncSession:
  26.     async with AsyncSessionLocal() as session:
  27.         try:
  28.             yield session
  29.         finally:
  30.             await session.close()
复制代码

CRUD操作

使用SQLAlchemy ORM进行CRUD操作:
  1. # app/crud/base.py
  2. from typing import Any, Dict, Generic, List, Optional, Type, TypeVar, Union
  3. from fastapi.encoders import jsonable_encoder
  4. from pydantic import BaseModel
  5. from sqlalchemy.orm import Session
  6. from app.db.base_class import Base
  7. ModelType = TypeVar("ModelType", bound=Base)
  8. CreateSchemaType = TypeVar("CreateSchemaType", bound=BaseModel)
  9. UpdateSchemaType = TypeVar("UpdateSchemaType", bound=BaseModel)
  10. class CRUDBase(Generic[ModelType, CreateSchemaType, UpdateSchemaType]):
  11.     def __init__(self, model: Type[ModelType]):
  12.         """
  13.         CRUD object with default methods to Create, Read, Update, Delete (CRUD).
  14.         """
  15.         self.model = model
  16.     def get(self, db: Session, id: Any) -> Optional[ModelType]:
  17.         return db.query(self.model).filter(self.model.id == id).first()
  18.     def get_multi(
  19.         self, db: Session, *, skip: int = 0, limit: int = 100
  20.     ) -> List[ModelType]:
  21.         return db.query(self.model).offset(skip).limit(limit).all()
  22.     def create(self, db: Session, *, obj_in: CreateSchemaType) -> ModelType:
  23.         obj_in_data = jsonable_encoder(obj_in)
  24.         db_obj = self.model(**obj_in_data)  # type: ignore
  25.         db.add(db_obj)
  26.         db.commit()
  27.         db.refresh(db_obj)
  28.         return db_obj
  29.     def update(
  30.         self,
  31.         db: Session,
  32.         *,
  33.         db_obj: ModelType,
  34.         obj_in: Union[UpdateSchemaType, Dict[str, Any]]
  35.     ) -> ModelType:
  36.         obj_data = jsonable_encoder(db_obj)
  37.         if isinstance(obj_in, dict):
  38.             update_data = obj_in
  39.         else:
  40.             update_data = obj_in.dict(exclude_unset=True)
  41.         for field in obj_data:
  42.             if field in update_data:
  43.                 setattr(db_obj, field, update_data[field])
  44.         db.add(db_obj)
  45.         db.commit()
  46.         db.refresh(db_obj)
  47.         return db_obj
  48.     def remove(self, db: Session, *, id: int) -> ModelType:
  49.         obj = db.query(self.model).get(id)
  50.         db.delete(obj)
  51.         db.commit()
  52.         return obj
复制代码
  1. # app/crud/post.py
  2. from typing import List, Optional
  3. from sqlalchemy.orm import Session
  4. from sqlalchemy import or_
  5. from app.crud.base import CRUDBase
  6. from app.models.post import Post
  7. from app.schemas.post import PostCreate, PostUpdate
  8. class CRUDPost(CRUDBase[Post, PostCreate, PostUpdate]):
  9.     def create_with_owner(
  10.         self, db: Session, *, obj_in: PostCreate, owner_id: int
  11.     ) -> Post:
  12.         obj_in_data = obj_in.dict()
  13.         db_obj = Post(**obj_in_data, owner_id=owner_id)
  14.         db.add(db_obj)
  15.         db.commit()
  16.         db.refresh(db_obj)
  17.         return db_obj
  18.    
  19.     def get_multi_by_owner(
  20.         self, db: Session, *, owner_id: int, skip: int = 0, limit: int = 100
  21.     ) -> List[Post]:
  22.         return (
  23.             db.query(self.model)
  24.             .filter(Post.owner_id == owner_id)
  25.             .offset(skip)
  26.             .limit(limit)
  27.             .all()
  28.         )
  29.    
  30.     def search(
  31.         self, db: Session, *, search: str, skip: int = 0, limit: int = 100
  32.     ) -> List[Post]:
  33.         return (
  34.             db.query(self.model)
  35.             .filter(
  36.                 or_(
  37.                     Post.title.ilike(f"%{search}%"),
  38.                     Post.content.ilike(f"%{search}%")
  39.                 )
  40.             )
  41.             .offset(skip)
  42.             .limit(limit)
  43.             .all()
  44.         )
  45. post = CRUDPost(Post)
复制代码

异步数据库操作

FastAPI支持异步操作,我们可以使用SQLAlchemy的异步API来提高性能:
  1. # app/crud/async_post.py
  2. from typing import List, Optional
  3. from sqlalchemy.ext.asyncio import AsyncSession
  4. from sqlalchemy import select, or_
  5. from app.models.post import Post
  6. from app.schemas.post import PostCreate, PostUpdate
  7. async def async_get_post(db: AsyncSession, post_id: int) -> Optional[Post]:
  8.     result = await db.execute(select(Post).where(Post.id == post_id))
  9.     return result.scalar_one_or_none()
  10. async def async_get_posts(
  11.     db: AsyncSession, skip: int = 0, limit: int = 100
  12. ) -> List[Post]:
  13.     result = await db.execute(select(Post).offset(skip).limit(limit))
  14.     return result.scalars().all()
  15. async def async_create_post(
  16.     db: AsyncSession, *, post_in: PostCreate, owner_id: int
  17. ) -> Post:
  18.     db_post = Post(**post_in.dict(), owner_id=owner_id)
  19.     db.add(db_post)
  20.     await db.commit()
  21.     await db.refresh(db_post)
  22.     return db_post
  23. async def async_search_posts(
  24.     db: AsyncSession, *, search: str, skip: int = 0, limit: int = 100
  25. ) -> List[Post]:
  26.     result = await db.execute(
  27.         select(Post)
  28.         .where(
  29.             or_(
  30.                 Post.title.ilike(f"%{search}%"),
  31.                 Post.content.ilike(f"%{search}%")
  32.             )
  33.         )
  34.         .offset(skip)
  35.         .limit(limit)
  36.     )
  37.     return result.scalars().all()
复制代码

缓存策略

Redis缓存集成

缓存是提高API性能的重要手段,我们可以使用Redis来实现缓存:
  1. # app/core/redis.py
  2. import redis
  3. import json
  4. from app.core.config import settings
  5. # 创建Redis连接
  6. redis_client = redis.Redis(
  7.     host=settings.REDIS_HOST,
  8.     port=settings.REDIS_PORT,
  9.     db=settings.REDIS_DB,
  10.     password=settings.REDIS_PASSWORD,
  11.     decode_responses=True
  12. )
  13. def get_from_cache(key: str):
  14.     """
  15.     从缓存中获取数据
  16.     """
  17.     try:
  18.         data = redis_client.get(key)
  19.         if data:
  20.             return json.loads(data)
  21.         return None
  22.     except Exception as e:
  23.         print(f"Redis GET error: {e}")
  24.         return None
  25. def set_to_cache(key: str, value: dict, expire: int = 3600):
  26.     """
  27.     设置缓存数据
  28.     """
  29.     try:
  30.         redis_client.setex(key, expire, json.dumps(value))
  31.     except Exception as e:
  32.         print(f"Redis SET error: {e}")
  33. def delete_from_cache(key: str):
  34.     """
  35.     删除缓存数据
  36.     """
  37.     try:
  38.         redis_client.delete(key)
  39.     except Exception as e:
  40.         print(f"Redis DELETE error: {e}")
复制代码

缓存装饰器

我们可以创建一个缓存装饰器,方便在API端点中使用缓存:
  1. # app/utils/cache.py
  2. import json
  3. from functools import wraps
  4. from fastapi import Request, Response
  5. from app.core.redis import get_from_cache, set_to_cache, delete_from_cache
  6. def cache_response(expire: int = 3600):
  7.     """
  8.     缓存响应装饰器
  9.     """
  10.     def decorator(func):
  11.         @wraps(func)
  12.         async def wrapper(*args, **kwargs):
  13.             # 尝试从kwargs中获取request对象
  14.             request = None
  15.             for arg in args:
  16.                 if isinstance(arg, Request):
  17.                     request = arg
  18.                     break
  19.             
  20.             if request:
  21.                 # 生成缓存键
  22.                 cache_key = f"{request.url.path}:{json.dumps(request.query_params)}"
  23.                
  24.                 # 尝试从缓存获取数据
  25.                 cached_data = get_from_cache(cache_key)
  26.                 if cached_data:
  27.                     return cached_data
  28.                
  29.                 # 执行原始函数
  30.                 response = await func(*args, **kwargs)
  31.                
  32.                 # 将响应存入缓存
  33.                 if isinstance(response, Response):
  34.                     set_to_cache(cache_key, response.body.decode(), expire)
  35.                 else:
  36.                     set_to_cache(cache_key, response, expire)
  37.                
  38.                 return response
  39.             else:
  40.                 # 如果没有request对象,直接执行原始函数
  41.                 return await func(*args, **kwargs)
  42.         
  43.         return wrapper
  44.     return decorator
  45. def invalidate_cache(pattern: str):
  46.     """
  47.     缓存失效装饰器
  48.     """
  49.     def decorator(func):
  50.         @wraps(func)
  51.         async def wrapper(*args, **kwargs):
  52.             # 执行原始函数
  53.             result = await func(*args, **kwargs)
  54.             
  55.             # 使匹配模式的缓存失效
  56.             try:
  57.                 from app.core.redis import redis_client
  58.                 for key in redis_client.scan_iter(match=pattern):
  59.                     redis_client.delete(key)
  60.             except Exception as e:
  61.                 print(f"Cache invalidation error: {e}")
  62.             
  63.             return result
  64.         
  65.         return wrapper
  66.     return decorator
复制代码

在API中使用缓存

现在我们可以在API端点中使用缓存:
  1. # app/api/v1/endpoints/posts.py
  2. from fastapi import APIRouter, Depends, HTTPException, Query, Request
  3. from sqlalchemy.orm import Session
  4. from app import crud, models, schemas
  5. from app.api import deps
  6. from app.utils.cache import cache_response, invalidate_cache
  7. router = APIRouter()
  8. @router.get("/", response_model=List[schemas.Post])
  9. @cache_response(expire=1800)  # 缓存30分钟
  10. def read_posts(
  11.     request: Request,
  12.     db: Session = Depends(deps.get_db),
  13.     skip: int = 0,
  14.     limit: int = 100,
  15.     search: str = None,
  16. ):
  17.     """
  18.     检索文章列表,支持缓存
  19.     """
  20.     if search:
  21.         posts = crud.post.search(db, search=search, skip=skip, limit=limit)
  22.     else:
  23.         posts = crud.post.get_multi(db, skip=skip, limit=limit)
  24.     return posts
  25. @router.post("/", response_model=schemas.Post)
  26. @invalidate_cache(pattern="/api/v1/posts*")  # 使文章列表缓存失效
  27. def create_post(
  28.     *,
  29.     db: Session = Depends(deps.get_db),
  30.     post_in: schemas.PostCreate,
  31.     current_user: models.User = Depends(deps.get_current_active_user),
  32. ):
  33.     """
  34.     创建新文章,并使相关缓存失效
  35.     """
  36.     post = crud.post.create_with_owner(db=db, obj_in=post_in, owner_id=current_user.id)
  37.     return post
复制代码

异步处理

异步任务处理

FastAPI原生支持异步操作,我们可以使用它来处理长时间运行的任务:
  1. # app/utils/tasks.py
  2. import asyncio
  3. from typing import Dict, Any
  4. from fastapi import BackgroundTasks
  5. from app.core.redis import redis_client
  6. import json
  7. async def process_data(data: Dict[str, Any]):
  8.     """
  9.     模拟一个长时间运行的数据处理任务
  10.     """
  11.     # 模拟耗时操作
  12.     await asyncio.sleep(10)
  13.    
  14.     # 处理数据
  15.     processed_data = {k: v.upper() if isinstance(v, str) else v for k, v in data.items()}
  16.    
  17.     # 将结果存储到Redis
  18.     task_id = data.get("task_id")
  19.     if task_id:
  20.         redis_client.setex(f"task_result:{task_id}", 3600, json.dumps(processed_data))
  21.    
  22.     return processed_data
  23. def run_task_in_background(background_tasks: BackgroundTasks, data: Dict[str, Any]):
  24.     """
  25.     在后台运行任务
  26.     """
  27.     background_tasks.add_task(process_data, data)
复制代码
  1. # app/api/v1/endpoints/tasks.py
  2. from fastapi import APIRouter, BackgroundTasks, HTTPException
  3. from app.utils.tasks import run_task_in_background
  4. from app.core.redis import redis_client
  5. import json
  6. import uuid
  7. router = APIRouter()
  8. @router.post("/process")
  9. async def create_processing_task(background_tasks: BackgroundTasks):
  10.     """
  11.     创建一个数据处理任务
  12.     """
  13.     task_id = str(uuid.uuid4())
  14.     data = {"task_id": task_id, "name": "sample data", "value": 42}
  15.    
  16.     # 在后台运行任务
  17.     run_task_in_background(background_tasks, data)
  18.    
  19.     return {"task_id": task_id, "message": "Task started"}
  20. @router.get("/status/{task_id}")
  21. async def get_task_status(task_id: str):
  22.     """
  23.     获取任务状态和结果
  24.     """
  25.     result = redis_client.get(f"task_result:{task_id}")
  26.     if result:
  27.         return {"task_id": task_id, "status": "completed", "result": json.loads(result)}
  28.     else:
  29.         return {"task_id": task_id, "status": "processing"}
复制代码

使用Celery进行分布式任务处理

对于更复杂的异步任务处理,我们可以集成Celery:
  1. # app/core/celery_app.py
  2. from celery import Celery
  3. from app.core.config import settings
  4. # 创建Celery实例
  5. celery_app = Celery(
  6.     "blog_system",
  7.     broker=settings.CELERY_BROKER_URL,
  8.     backend=settings.CELERY_RESULT_BACKEND,
  9.     include=["app.tasks"]
  10. )
  11. # Celery配置
  12. celery_app.conf.update(
  13.     task_serializer="json",
  14.     result_serializer="json",
  15.     accept_content=["json"],
  16.     timezone="UTC",
  17.     enable_utc=True,
  18.     task_track_started=True,
  19.     task_time_limit=30 * 60,  # 30分钟超时
  20.     task_soft_time_limit=25 * 60,  # 25分钟软超时
  21.     worker_prefetch_multiplier=1,  # 每个worker一次只取一个任务
  22. )
复制代码
  1. # app/tasks.py
  2. from app.core.celery_app import celery_app
  3. from app.core.redis import redis_client
  4. import json
  5. import time
  6. @celery_app.task(bind=True)
  7. def process_data_task(self, data):
  8.     """
  9.     使用Celery处理数据的任务
  10.     """
  11.     try:
  12.         # 更新任务状态
  13.         self.update_state(
  14.             state="PROGRESS",
  15.             meta={"current": 1, "total": 3, "status": "Starting data processing"}
  16.         )
  17.         
  18.         # 模拟数据处理步骤1
  19.         time.sleep(5)
  20.         processed_data = {k: v.upper() if isinstance(v, str) else v for k, v in data.items()}
  21.         
  22.         # 更新任务状态
  23.         self.update_state(
  24.             state="PROGRESS",
  25.             meta={"current": 2, "total": 3, "status": "Processing data"}
  26.         )
  27.         
  28.         # 模拟数据处理步骤2
  29.         time.sleep(5)
  30.         processed_data["processed_at"] = time.time()
  31.         
  32.         # 更新任务状态
  33.         self.update_state(
  34.             state="PROGRESS",
  35.             meta={"current": 3, "total": 3, "status": "Saving results"}
  36.         )
  37.         
  38.         # 模拟数据处理步骤3
  39.         time.sleep(5)
  40.         task_id = data.get("task_id")
  41.         if task_id:
  42.             redis_client.setex(f"celery_task_result:{task_id}", 3600, json.dumps(processed_data))
  43.         
  44.         # 返回结果
  45.         return {"status": "completed", "result": processed_data}
  46.    
  47.     except Exception as e:
  48.         # 更新任务状态为失败
  49.         self.update_state(
  50.             state="FAILURE",
  51.             meta={"current": 3, "total": 3, "status": f"Error: {str(e)}"}
  52.         )
  53.         raise e
复制代码
  1. # app/api/v1/endpoints/celery_tasks.py
  2. from fastapi import APIRouter, HTTPException
  3. from app.tasks import process_data_task
  4. from app.core.redis import redis_client
  5. import json
  6. import uuid
  7. router = APIRouter()
  8. @router.post("/process")
  9. async def create_celery_task():
  10.     """
  11.     创建一个Celery数据处理任务
  12.     """
  13.     task_id = str(uuid.uuid4())
  14.     data = {"task_id": task_id, "name": "sample data", "value": 42}
  15.    
  16.     # 启动Celery任务
  17.     task = process_data_task.delay(data)
  18.    
  19.     return {"task_id": task_id, "celery_task_id": task.id, "message": "Task started"}
  20. @router.get("/status/{task_id}")
  21. async def get_celery_task_status(task_id: str):
  22.     """
  23.     获取Celery任务状态和结果
  24.     """
  25.     # 从Redis获取结果
  26.     result = redis_client.get(f"celery_task_result:{task_id}")
  27.     if result:
  28.         return {"task_id": task_id, "status": "completed", "result": json.loads(result)}
  29.    
  30.     # 如果Redis中没有结果,返回任务处理中
  31.     return {"task_id": task_id, "status": "processing"}
复制代码

性能优化

数据库优化

数据库查询优化是提高API性能的关键:
  1. # app/crud/optimized_post.py
  2. from typing import List, Optional
  3. from sqlalchemy.orm import Session, joinedload
  4. from sqlalchemy import func, or_
  5. from app.models.post import Post
  6. from app.models.user import User
  7. from app.schemas.post import PostCreate, PostUpdate
  8. def get_post_with_author(db: Session, post_id: int) -> Optional[Post]:
  9.     """
  10.     获取文章及其作者信息(使用joinedload优化查询)
  11.     """
  12.     return (
  13.         db.query(Post)
  14.         .options(joinedload(Post.owner))
  15.         .filter(Post.id == post_id)
  16.         .first()
  17.     )
  18. def get_posts_with_pagination(
  19.     db: Session, skip: int = 0, limit: int = 100
  20. ) -> tuple[List[Post], int]:
  21.     """
  22.     获取文章列表及总数(用于分页)
  23.     """
  24.     # 使用一个查询获取总数
  25.     total = db.query(func.count(Post.id)).scalar()
  26.    
  27.     # 使用另一个查询获取分页数据
  28.     posts = (
  29.         db.query(Post)
  30.         .options(joinedload(Post.owner))
  31.         .offset(skip)
  32.         .limit(limit)
  33.         .all()
  34.     )
  35.    
  36.     return posts, total
  37. def get_popular_posts(db: Session, limit: int = 10) -> List[Post]:
  38.     """
  39.     获取热门文章(基于评论数量)
  40.     """
  41.     return (
  42.         db.query(Post)
  43.         .join(Post.comments)
  44.         .group_by(Post.id)
  45.         .order_by(func.count(Post.comments).desc())
  46.         .limit(limit)
  47.         .all()
  48.     )
  49. def search_posts_optimized(
  50.     db: Session, *, search: str, skip: int = 0, limit: int = 100
  51. ) -> List[Post]:
  52.     """
  53.     优化的文章搜索(使用全文索引)
  54.     """
  55.     return (
  56.         db.query(Post)
  57.         .filter(
  58.             or_(
  59.                 Post.title.ilike(f"%{search}%"),
  60.                 Post.content.ilike(f"%{search}%")
  61.             )
  62.         )
  63.         .options(joinedload(Post.owner))
  64.         .offset(skip)
  65.         .limit(limit)
  66.         .all()
  67.     )
复制代码

响应压缩

FastAPI支持响应压缩,可以显著减少传输数据量:
  1. # app/main.py
  2. from fastapi import FastAPI
  3. from fastapi.middleware.gzip import GZipMiddleware
  4. from app.api.v1.api import api_router
  5. from app.core.config import settings
  6. app = FastAPI(
  7.     title=settings.PROJECT_NAME,
  8.     openapi_url=f"{settings.API_V1_STR}/openapi.json"
  9. )
  10. # 添加GZip中间件
  11. app.add_middleware(GZipMiddleware, minimum_size=1000)
  12. app.include_router(api_router, prefix=settings.API_V1_STR)
复制代码

异步端点

使用异步端点可以提高并发性能:
  1. # app/api/v1/endpoints/async_posts.py
  2. from typing import List, Optional
  3. from fastapi import APIRouter, Depends, HTTPException, Query
  4. from sqlalchemy.ext.asyncio import AsyncSession
  5. from sqlalchemy import select, or_, func
  6. from app.models.post import Post
  7. from app.models.user import User
  8. from app.schemas.post import Post, PostCreate
  9. from app.api.deps import get_async_db
  10. from app.crud.async_post import async_create_post
  11. router = APIRouter()
  12. @router.get("/", response_model=List[Post])
  13. async def read_posts(
  14.     db: AsyncSession = Depends(get_async_db),
  15.     skip: int = 0,
  16.     limit: int = 100,
  17.     search: Optional[str] = Query(None, min_length=3),
  18. ):
  19.     """
  20.     异步获取文章列表
  21.     """
  22.     if search:
  23.         query = (
  24.             select(Post)
  25.             .where(
  26.                 or_(
  27.                     Post.title.ilike(f"%{search}%"),
  28.                     Post.content.ilike(f"%{search}%")
  29.                 )
  30.             )
  31.             .offset(skip)
  32.             .limit(limit)
  33.         )
  34.     else:
  35.         query = select(Post).offset(skip).limit(limit)
  36.    
  37.     result = await db.execute(query)
  38.     return result.scalars().all()
  39. @router.post("/", response_model=Post)
  40. async def create_post(
  41.     *,
  42.     db: AsyncSession = Depends(get_async_db),
  43.     post_in: PostCreate,
  44.     # 假设我们有一个异步的get_current_active_user函数
  45.     current_user: User = Depends(get_current_active_user),
  46. ):
  47.     """
  48.     异步创建新文章
  49.     """
  50.     return await async_create_post(db=db, post_in=post_in, owner_id=current_user.id)
复制代码

测试策略

单元测试

使用pytest进行单元测试:
  1. # tests/test_crud.py
  2. import pytest
  3. from sqlalchemy import create_engine
  4. from sqlalchemy.orm import sessionmaker
  5. from app.db.base_class import Base
  6. from app.crud.post import post
  7. from app.models.post import Post
  8. from app.schemas.post import PostCreate, PostUpdate
  9. from app.core.security import get_password_hash
  10. # 测试数据库
  11. SQLALCHEMY_DATABASE_URL = "sqlite:///./test.db"
  12. engine = create_engine(SQLALCHEMY_DATABASE_URL, connect_args={"check_same_thread": False})
  13. TestingSessionLocal = sessionmaker(autocommit=False, autoflush=False, bind=engine)
  14. @pytest.fixture
  15. def db_session():
  16.     Base.metadata.create_all(bind=engine)
  17.     db = TestingSessionLocal()
  18.     try:
  19.         yield db
  20.     finally:
  21.         db.close()
  22.         Base.metadata.drop_all(bind=engine)
  23. def test_create_post(db_session):
  24.     title = "Test Post"
  25.     content = "This is a test post"
  26.     post_in = PostCreate(title=title, content=content)
  27.    
  28.     # 创建一个测试用户
  29.     from app.models.user import User
  30.     user = User(
  31.         email="test@example.com",
  32.         hashed_password=get_password_hash("testpassword"),
  33.         full_name="Test User",
  34.         is_active=True,
  35.     )
  36.     db_session.add(user)
  37.     db_session.commit()
  38.     db_session.refresh(user)
  39.    
  40.     # 创建文章
  41.     created_post = post.create_with_owner(db=db_session, obj_in=post_in, owner_id=user.id)
  42.    
  43.     assert created_post.title == title
  44.     assert created_post.content == content
  45.     assert created_post.owner_id == user.id
  46. def test_get_post(db_session):
  47.     title = "Test Post"
  48.     content = "This is a test post"
  49.     post_in = PostCreate(title=title, content=content)
  50.    
  51.     # 创建一个测试用户
  52.     from app.models.user import User
  53.     user = User(
  54.         email="test@example.com",
  55.         hashed_password=get_password_hash("testpassword"),
  56.         full_name="Test User",
  57.         is_active=True,
  58.     )
  59.     db_session.add(user)
  60.     db_session.commit()
  61.     db_session.refresh(user)
  62.    
  63.     # 创建文章
  64.     created_post = post.create_with_owner(db=db_session, obj_in=post_in, owner_id=user.id)
  65.    
  66.     # 获取文章
  67.     retrieved_post = post.get(db=db_session, id=created_post.id)
  68.    
  69.     assert retrieved_post
  70.     assert retrieved_post.id == created_post.id
  71.     assert retrieved_post.title == title
  72.     assert retrieved_post.content == content
  73. def test_update_post(db_session):
  74.     title = "Test Post"
  75.     content = "This is a test post"
  76.     post_in = PostCreate(title=title, content=content)
  77.    
  78.     # 创建一个测试用户
  79.     from app.models.user import User
  80.     user = User(
  81.         email="test@example.com",
  82.         hashed_password=get_password_hash("testpassword"),
  83.         full_name="Test User",
  84.         is_active=True,
  85.     )
  86.     db_session.add(user)
  87.     db_session.commit()
  88.     db_session.refresh(user)
  89.    
  90.     # 创建文章
  91.     created_post = post.create_with_owner(db=db_session, obj_in=post_in, owner_id=user.id)
  92.    
  93.     # 更新文章
  94.     new_title = "Updated Test Post"
  95.     post_update = PostUpdate(title=new_title)
  96.     updated_post = post.update(db=db_session, db_obj=created_post, obj_in=post_update)
  97.    
  98.     assert updated_post.title == new_title
  99.     assert updated_post.content == content  # 内容应保持不变
复制代码

API端点测试

使用httpx进行API端点测试:
  1. # tests/test_api.py
  2. import pytest
  3. from fastapi.testclient import TestClient
  4. from app.main import app
  5. from app.core.config import settings
  6. from app.core.security import create_access_token
  7. from app.models.user import User
  8. from app.schemas.user import UserCreate
  9. client = TestClient(app)
  10. @pytest.fixture
  11. def test_user():
  12.     return {
  13.         "email": "test@example.com",
  14.         "password": "testpassword",
  15.         "full_name": "Test User"
  16.     }
  17. @pytest.fixture
  18. def user_token_headers(test_user):
  19.     # 创建用户
  20.     response = client.post(
  21.         f"{settings.API_V1_STR}/users/", json=test_user
  22.     )
  23.     assert response.status_code == 200
  24.    
  25.     # 获取访问令牌
  26.     login_data = {
  27.         "username": test_user["email"],
  28.         "password": test_user["password"]
  29.     }
  30.     response = client.post(f"{settings.API_V1_STR}/login/access-token", data=login_data)
  31.     assert response.status_code == 200
  32.     token = response.json()["access_token"]
  33.    
  34.     return {"Authorization": f"Bearer {token}"}
  35. def test_create_post(user_token_headers):
  36.     post_data = {
  37.         "title": "Test Post",
  38.         "content": "This is a test post",
  39.         "published": False
  40.     }
  41.     response = client.post(
  42.         f"{settings.API_V1_STR}/posts/",
  43.         headers=user_token_headers,
  44.         json=post_data
  45.     )
  46.     assert response.status_code == 200
  47.     content = response.json()
  48.     assert content["title"] == post_data["title"]
  49.     assert content["content"] == post_data["content"]
  50.     assert content["published"] == post_data["published"]
  51.     assert "id" in content
  52.     assert "owner_id" in content
  53. def test_read_posts(user_token_headers):
  54.     # 先创建一个文章
  55.     post_data = {
  56.         "title": "Test Post for Reading",
  57.         "content": "This is a test post for reading",
  58.         "published": True
  59.     }
  60.     client.post(
  61.         f"{settings.API_V1_STR}/posts/",
  62.         headers=user_token_headers,
  63.         json=post_data
  64.     )
  65.    
  66.     # 读取文章列表
  67.     response = client.get(
  68.         f"{settings.API_V1_STR}/posts/",
  69.         headers=user_token_headers
  70.     )
  71.     assert response.status_code == 200
  72.     content = response.json()
  73.     assert len(content) >= 1
  74.     assert content[0]["title"] == post_data["title"]
  75. def test_update_post(user_token_headers):
  76.     # 先创建一个文章
  77.     post_data = {
  78.         "title": "Test Post for Update",
  79.         "content": "This is a test post for update",
  80.         "published": False
  81.     }
  82.     response = client.post(
  83.         f"{settings.API_V1_STR}/posts/",
  84.         headers=user_token_headers,
  85.         json=post_data
  86.     )
  87.     post_id = response.json()["id"]
  88.    
  89.     # 更新文章
  90.     update_data = {
  91.         "title": "Updated Test Post",
  92.         "published": True
  93.     }
  94.     response = client.put(
  95.         f"{settings.API_V1_STR}/posts/{post_id}",
  96.         headers=user_token_headers,
  97.         json=update_data
  98.     )
  99.     assert response.status_code == 200
  100.     content = response.json()
  101.     assert content["title"] == update_data["title"]
  102.     assert content["content"] == post_data["content"]  # 内容应保持不变
  103.     assert content["published"] == update_data["published"]
  104. def test_delete_post(user_token_headers):
  105.     # 先创建一个文章
  106.     post_data = {
  107.         "title": "Test Post for Deletion",
  108.         "content": "This is a test post for deletion",
  109.         "published": False
  110.     }
  111.     response = client.post(
  112.         f"{settings.API_V1_STR}/posts/",
  113.         headers=user_token_headers,
  114.         json=post_data
  115.     )
  116.     post_id = response.json()["id"]
  117.    
  118.     # 删除文章
  119.     response = client.delete(
  120.         f"{settings.API_V1_STR}/posts/{post_id}",
  121.         headers=user_token_headers
  122.     )
  123.     assert response.status_code == 200
  124.    
  125.     # 验证文章已被删除
  126.     response = client.get(
  127.         f"{settings.API_V1_STR}/posts/{post_id}",
  128.         headers=user_token_headers
  129.     )
  130.     assert response.status_code == 404
复制代码

部署与监控

Docker部署

使用Docker容器化部署FastAPI应用:
  1. # Dockerfile
  2. FROM python:3.9-slim
  3. # 设置工作目录
  4. WORKDIR /app
  5. # 设置环境变量
  6. ENV PYTHONDONTWRITEBYTECODE 1
  7. ENV PYTHONUNBUFFERED 1
  8. # 安装系统依赖
  9. RUN apt-get update \
  10.     && apt-get install -y --no-install-recommends \
  11.         gcc \
  12.         postgresql-client \
  13.         && rm -rf /var/lib/apt/lists/*
  14. # 安装Python依赖
  15. COPY requirements.txt .
  16. RUN pip install --no-cache-dir -r requirements.txt
  17. # 复制项目代码
  18. COPY . .
  19. # 收集静态文件(如果有)
  20. # RUN python manage.py collectstatic --noinput
  21. # 创建非root用户
  22. RUN adduser --disabled-password --gecos '' appuser
  23. RUN chown -R appuser:appuser /app
  24. USER appuser
  25. # 暴露端口
  26. EXPOSE 8000
  27. # 启动命令
  28. CMD ["uvicorn", "app.main:app", "--host", "0.0.0.0", "--port", "8000"]
复制代码
  1. # docker-compose.yml
  2. version: '3.8'
  3. services:
  4.   web:
  5.     build: .
  6.     ports:
  7.       - "8000:8000"
  8.     depends_on:
  9.       - db
  10.       - redis
  11.     environment:
  12.       - DATABASE_URL=postgresql://fastapi_user:fastapi_password@db:5432/blog_db
  13.       - REDIS_URL=redis://redis:6379/0
  14.     volumes:
  15.       - .:/app
  16.     command: uvicorn app.main:app --host 0.0.0.0 --port 8000 --reload
  17.   db:
  18.     image: postgres:13
  19.     environment:
  20.       - POSTGRES_USER=fastapi_user
  21.       - POSTGRES_PASSWORD=fastapi_password
  22.       - POSTGRES_DB=blog_db
  23.     volumes:
  24.       - postgres_data:/var/lib/postgresql/data/
  25.     ports:
  26.       - "5432:5432"
  27.   redis:
  28.     image: redis:6-alpine
  29.     ports:
  30.       - "6379:6379"
  31.   celery:
  32.     build: .
  33.     command: celery -A app.core.celery_app worker --loglevel=info
  34.     volumes:
  35.       - .:/app
  36.     depends_on:
  37.       - db
  38.       - redis
  39.     environment:
  40.       - DATABASE_URL=postgresql://fastapi_user:fastapi_password@db:5432/blog_db
  41.       - REDIS_URL=redis://redis:6379/0
  42.       - CELERY_BROKER_URL=redis://redis:6379/0
  43.       - CELERY_RESULT_BACKEND=redis://redis:6379/0
  44.   flower:
  45.     build: .
  46.     command: celery -A app.core.celery_app flower --port=5555
  47.     ports:
  48.       - "5555:5555"
  49.     depends_on:
  50.       - redis
  51.     environment:
  52.       - CELERY_BROKER_URL=redis://redis:6379/0
  53.       - CELERY_RESULT_BACKEND=redis://redis:6379/0
  54. volumes:
  55.   postgres_data:
复制代码

日志和监控

实现日志记录和监控:
  1. # app/core/logging.py
  2. import logging
  3. import sys
  4. from pathlib import Path
  5. from app.core.config import settings
  6. # 创建日志目录
  7. log_dir = Path("logs")
  8. log_dir.mkdir(exist_ok=True)
  9. # 配置日志
  10. def setup_logging():
  11.     """
  12.     配置应用日志
  13.     """
  14.     # 创建日志格式化器
  15.     formatter = logging.Formatter(
  16.         "%(asctime)s - %(name)s - %(levelname)s - %(message)s"
  17.     )
  18.    
  19.     # 创建文件处理器
  20.     file_handler = logging.FileHandler("logs/app.log")
  21.     file_handler.setFormatter(formatter)
  22.     file_handler.setLevel(logging.INFO)
  23.    
  24.     # 创建控制台处理器
  25.     console_handler = logging.StreamHandler(sys.stdout)
  26.     console_handler.setFormatter(formatter)
  27.     console_handler.setLevel(logging.INFO if not settings.DEBUG else logging.DEBUG)
  28.    
  29.     # 配置根日志记录器
  30.     root_logger = logging.getLogger()
  31.     root_logger.setLevel(logging.INFO if not settings.DEBUG else logging.DEBUG)
  32.     root_logger.addHandler(file_handler)
  33.     root_logger.addHandler(console_handler)
  34.    
  35.     # 配置第三方库日志级别
  36.     logging.getLogger("uvicorn.access").setLevel(logging.WARNING)
  37.     logging.getLogger("sqlalchemy.engine").setLevel(logging.WARNING)
  38.    
  39.     return root_logger
复制代码
  1. # app/main.py
  2. from fastapi import FastAPI, Request, Response
  3. from fastapi.middleware.gzip import GZipMiddleware
  4. from fastapi.middleware.httpsredirect import HTTPSRedirectMiddleware
  5. import time
  6. import logging
  7. from app.api.v1.api import api_router
  8. from app.core.config import settings
  9. from app.core.logging import setup_logging
  10. # 设置日志
  11. logger = setup_logging()
  12. app = FastAPI(
  13.     title=settings.PROJECT_NAME,
  14.     openapi_url=f"{settings.API_V1_STR}/openapi.json"
  15. )
  16. # 添加中间件
  17. if settings.ENABLE_HTTPS:
  18.     app.add_middleware(HTTPSRedirectMiddleware)
  19. app.add_middleware(GZipMiddleware, minimum_size=1000)
  20. # 请求日志中间件
  21. @app.middleware("http")
  22. async def log_requests(request: Request, call_next):
  23.     start_time = time.time()
  24.    
  25.     # 记录请求信息
  26.     logger.info(f"Request started: {request.method} {request.url}")
  27.    
  28.     response = await call_next(request)
  29.    
  30.     # 计算处理时间
  31.     process_time = time.time() - start_time
  32.    
  33.     # 记录响应信息
  34.     logger.info(
  35.         f"Request completed: {request.method} {request.url} - Status: {response.status_code} - Time: {process_time:.4f}s"
  36.     )
  37.    
  38.     # 添加处理时间到响应头
  39.     response.headers["X-Process-Time"] = str(process_time)
  40.    
  41.     return response
  42. app.include_router(api_router, prefix=settings.API_V1_STR)
  43. @app.on_event("startup")
  44. async def startup_event():
  45.     logger.info("Application startup...")
  46. @app.on_event("shutdown")
  47. async def shutdown_event():
  48.     logger.info("Application shutdown...")
复制代码

性能监控

使用Prometheus和Grafana进行性能监控:
  1. # app/core/metrics.py
  2. from prometheus_client import Counter, Histogram, Gauge, generate_latest
  3. from fastapi import Response
  4. import time
  5. # 定义指标
  6. REQUEST_COUNT = Counter(
  7.     'http_requests_total',
  8.     'Total HTTP Requests',
  9.     ['method', 'endpoint', 'status_code']
  10. )
  11. REQUEST_DURATION = Histogram(
  12.     'http_request_duration_seconds',
  13.     'HTTP Request Duration',
  14.     ['method', 'endpoint']
  15. )
  16. ACTIVE_USERS = Gauge(
  17.     'active_users',
  18.     'Number of active users'
  19. )
  20. POST_COUNT = Gauge(
  21.     'post_count',
  22.     'Total number of posts'
  23. )
  24. # Prometheus端点
  25. async def metrics_endpoint():
  26.     return Response(generate_latest(), media_type="text/plain")
  27. # 指标装饰器
  28. def monitor_requests(endpoint_name: str):
  29.     def decorator(func):
  30.         async def wrapper(*args, **kwargs):
  31.             start_time = time.time()
  32.             
  33.             # 获取请求对象
  34.             request = None
  35.             for arg in args:
  36.                 if hasattr(arg, 'method'):
  37.                     request = arg
  38.                     break
  39.             
  40.             # 执行原始函数
  41.             response = await func(*args, **kwargs)
  42.             
  43.             # 计算处理时间
  44.             process_time = time.time() - start_time
  45.             
  46.             # 更新指标
  47.             if request:
  48.                 REQUEST_COUNT.labels(
  49.                     method=request.method,
  50.                     endpoint=endpoint_name,
  51.                     status_code=response.status_code
  52.                 ).inc()
  53.                
  54.                 REQUEST_DURATION.labels(
  55.                     method=request.method,
  56.                     endpoint=endpoint_name
  57.                 ).observe(process_time)
  58.             
  59.             return response
  60.         
  61.         return wrapper
  62.     return decorator
复制代码
  1. # app/main.py
  2. from fastapi import FastAPI
  3. from app.api.v1.api import api_router
  4. from app.core.config import settings
  5. from app.core.metrics import metrics_endpoint
  6. app = FastAPI(
  7.     title=settings.PROJECT_NAME,
  8.     openapi_url=f"{settings.API_V1_STR}/openapi.json"
  9. )
  10. app.include_router(api_router, prefix=settings.API_V1_STR)
  11. # 添加Prometheus指标端点
  12. app.add_route("/metrics", metrics_endpoint)
复制代码

总结与最佳实践

通过这个完整的FastAPI实战项目,我们展示了从API设计到系统优化的全过程。以下是一些关键的最佳实践总结:

1. 项目结构设计

• 使用模块化结构,按功能划分代码
• 遵循依赖注入原则,提高代码可测试性
• 使用类型提示,提高代码可读性和IDE支持

2. API设计

• 遵循RESTful设计原则
• 使用Pydantic模型进行数据验证和序列化
• 提供清晰的错误信息和适当的HTTP状态码

3. 认证与授权

• 使用JWT进行无状态认证
• 实现基于角色的访问控制
• 使用FastAPI的依赖注入系统简化权限检查

4. 数据库集成

• 使用SQLAlchemy ORM进行数据库操作
• 实现异步数据库操作提高性能
• 使用数据库迁移工具管理架构变更

5. 缓存策略

• 使用Redis缓存常用数据
• 实现缓存失效机制保持数据一致性
• 使用装饰器简化缓存逻辑

6. 异步处理

• 利用FastAPI的异步特性提高并发性能
• 使用Celery处理后台任务
• 实现任务状态跟踪和结果查询

7. 性能优化

• 优化数据库查询,减少N+1问题
• 使用响应压缩减少传输数据量
• 实现异步端点提高吞吐量

8. 测试策略

• 编写单元测试确保代码质量
• 使用TestClient进行API端点测试
• 实现测试夹具简化测试设置

9. 部署与监控

• 使用Docker容器化应用
• 实现日志记录和监控
• 使用Prometheus和Grafana进行性能监控

通过遵循这些最佳实践,你可以构建出高性能、可维护、可扩展的FastAPI应用,满足现代Web开发的需求。FastAPI的强大功能和简洁设计使其成为构建API的理想选择,无论是小型项目还是大型企业应用,都能胜任。

希望这个实战项目案例能帮助你更好地理解和应用FastAPI,掌握现代Web开发的精髓。
「七転び八起き(ななころびやおき)」
回复

使用道具 举报

您需要登录后才可以回帖 登录 | 立即注册

本版积分规则

加入频道

加入频道

加入社群

加入社群

联系我们|小黑屋|TG频道|RSS

Powered by Pixtech

© 2025 Pixtech Team.