|
|
马上注册,结交更多好友,享用更多功能,让你轻松玩转社区。
您需要 登录 才可以下载或查看,没有账号?立即注册
x
引言
FastAPI是近年来备受瞩目的Python Web框架,它凭借出色的性能、直观的API设计和现代化的开发体验,迅速成为构建高性能API的首选框架之一。本文将通过一个完整的实战项目,深入探讨FastAPI的各项关键技术,帮助开发者从零开始构建一个生产级别的Web应用。
FastAPI的核心优势在于:
• 基于Starlette和Pydantic,提供高性能的异步处理能力
• 自动生成交互式API文档(Swagger UI)
• 内置数据验证和序列化功能
• 支持依赖注入系统,便于测试和模块化开发
• 类型提示支持,提供更好的IDE体验和代码质量
在本文中,我们将通过构建一个完整的博客系统来展示FastAPI的强大功能,涵盖从基础API设计到高级系统优化的全过程。
项目结构设计
良好的项目结构是成功的一半。对于FastAPI项目,推荐采用以下结构:
- blog_system/
- ├── app/
- │ ├── __init__.py
- │ ├── main.py # FastAPI应用入口
- │ ├── core/ # 核心配置
- │ │ ├── __init__.py
- │ │ ├── config.py # 配置管理
- │ │ └── security.py # 安全相关功能
- │ ├── api/ # API路由
- │ │ ├── __init__.py
- │ │ ├── v1/
- │ │ │ ├── __init__.py
- │ │ │ ├── endpoints/
- │ │ │ │ ├── __init__.py
- │ │ │ │ ├── posts.py
- │ │ │ │ ├── users.py
- │ │ │ │ └── comments.py
- │ │ │ └── api.py # API路由注册
- │ ├── models/ # 数据模型
- │ │ ├── __init__.py
- │ │ ├── user.py
- │ │ ├── post.py
- │ │ └── comment.py
- │ ├── schemas/ # Pydantic模型
- │ │ ├── __init__.py
- │ │ ├── user.py
- │ │ ├── post.py
- │ │ └── comment.py
- │ ├── crud/ # 数据库操作
- │ │ ├── __init__.py
- │ │ ├── base.py
- │ │ ├── user.py
- │ │ ├── post.py
- │ │ └── comment.py
- │ ├── db/ # 数据库配置
- │ │ ├── __init__.py
- │ │ ├── session.py # 数据库会话
- │ │ └── base_class.py # 基础模型类
- │ └── utils/ # 工具函数
- │ ├── __init__.py
- │ └── helpers.py
- ├── tests/ # 测试目录
- │ ├── __init__.py
- │ ├── conftest.py
- │ ├── test_api.py
- │ └── test_crud.py
- ├── alembic/ # 数据库迁移
- ├── requirements.txt # 项目依赖
- ├── .env # 环境变量
- └── Dockerfile # Docker配置
复制代码
这种结构遵循了领域驱动设计(DDD)的原则,将不同功能的代码分门别类地组织在一起,便于维护和扩展。
API设计与实现
RESTful API设计原则
在FastAPI中设计API时,我们遵循RESTful原则:
1. 使用HTTP动词表示操作类型(GET、POST、PUT、DELETE等)
2. 使用名词表示资源(如/users、/posts)
3. 使用嵌套资源表示关系(如/posts/{post_id}/comments)
4. 使用HTTP状态码表示操作结果
5. 支持内容协商(JSON、XML等)
基本路由实现
让我们从实现博客系统的基本路由开始:
- # app/main.py
- from fastapi import FastAPI
- from app.api.v1.api import api_router
- from app.core.config import settings
- from fastapi.middleware.cors import CORSMiddleware
- app = FastAPI(
- title=settings.PROJECT_NAME,
- openapi_url=f"{settings.API_V1_STR}/openapi.json"
- )
- # 设置CORS
- if settings.BACKEND_CORS_ORIGINS:
- app.add_middleware(
- CORSMiddleware,
- allow_origins=[str(origin) for origin in settings.BACKEND_CORS_ORIGINS],
- allow_credentials=True,
- allow_methods=["*"],
- allow_headers=["*"],
- )
- app.include_router(api_router, prefix=settings.API_V1_STR)
- @app.get("/")
- async def root():
- return {"message": "Welcome to Blog System API"}
复制代码- # app/api/v1/api.py
- from fastapi import APIRouter
- from app.api.v1.endpoints import posts, users, comments
- api_router = APIRouter()
- api_router.include_router(users.router, prefix="/users", tags=["users"])
- api_router.include_router(posts.router, prefix="/posts", tags=["posts"])
- api_router.include_router(comments.router, prefix="/comments", tags=["comments"])
复制代码- # app/api/v1/endpoints/posts.py
- from typing import List, Optional
- from fastapi import APIRouter, Depends, HTTPException, Query
- from sqlalchemy.orm import Session
- from app import crud, models, schemas
- from app.api import deps
- router = APIRouter()
- @router.get("/", response_model=List[schemas.Post])
- def read_posts(
- db: Session = Depends(deps.get_db),
- skip: int = 0,
- limit: int = 100,
- search: Optional[str] = Query(None, min_length=3, description="Search in post title and content")
- ):
- """
- Retrieve posts.
- """
- if search:
- posts = crud.post.search(db, search=search, skip=skip, limit=limit)
- else:
- posts = crud.post.get_multi(db, skip=skip, limit=limit)
- return posts
- @router.post("/", response_model=schemas.Post)
- def create_post(
- *,
- db: Session = Depends(deps.get_db),
- post_in: schemas.PostCreate,
- current_user: models.User = Depends(deps.get_current_active_user),
- ):
- """
- Create new post.
- """
- post = crud.post.create_with_owner(db=db, obj_in=post_in, owner_id=current_user.id)
- return post
- @router.get("/{post_id}", response_model=schemas.Post)
- def read_post(
- *,
- db: Session = Depends(deps.get_db),
- post_id: int,
- ):
- """
- Get post by ID.
- """
- post = crud.post.get(db=db, id=post_id)
- if not post:
- raise HTTPException(status_code=404, detail="Post not found")
- return post
- @router.put("/{post_id}", response_model=schemas.Post)
- def update_post(
- *,
- db: Session = Depends(deps.get_db),
- post_id: int,
- post_in: schemas.PostUpdate,
- current_user: models.User = Depends(deps.get_current_active_user),
- ):
- """
- Update a post.
- """
- post = crud.post.get(db=db, id=post_id)
- if not post:
- raise HTTPException(status_code=404, detail="Post not found")
- if post.owner_id != current_user.id:
- raise HTTPException(status_code=403, detail="Not enough permissions")
- post = crud.post.update(db=db, db_obj=post, obj_in=post_in)
- return post
- @router.delete("/{post_id}", response_model=schemas.Post)
- def delete_post(
- *,
- db: Session = Depends(deps.get_db),
- post_id: int,
- current_user: models.User = Depends(deps.get_current_active_user),
- ):
- """
- Delete a post.
- """
- post = crud.post.get(db=db, id=post_id)
- if not post:
- raise HTTPException(status_code=404, detail="Post not found")
- if post.owner_id != current_user.id:
- raise HTTPException(status_code=403, detail="Not enough permissions")
- post = crud.post.remove(db=db, id=post_id)
- return post
复制代码
数据模型与Pydantic Schema
在FastAPI中,我们使用SQLAlchemy定义数据库模型,使用Pydantic定义请求和响应的数据结构:
- # app/models/post.py
- from sqlalchemy import Boolean, Column, Integer, String, Text, ForeignKey, DateTime
- from sqlalchemy.orm import relationship
- from sqlalchemy.sql import func
- from app.db.base_class import Base
- class Post(Base):
- id = Column(Integer, primary_key=True, index=True)
- title = Column(String(256), nullable=False, index=True)
- content = Column(Text, nullable=False)
- published = Column(Boolean, default=False)
- created_at = Column(DateTime(timezone=True), server_default=func.now())
- updated_at = Column(DateTime(timezone=True), onupdate=func.now())
- owner_id = Column(Integer, ForeignKey("user.id"), nullable=False)
-
- owner = relationship("User", back_populates="posts")
- comments = relationship("Comment", back_populates="post", cascade="all, delete-orphan")
复制代码- # app/schemas/post.py
- from typing import Optional, List
- from datetime import datetime
- from pydantic import BaseModel
- # Base schema with common attributes
- class PostBase(BaseModel):
- title: str
- content: str
- published: bool = False
- # Schema for creating a new post
- class PostCreate(PostBase):
- pass
- # Schema for updating a post
- class PostUpdate(PostBase):
- title: Optional[str] = None
- content: Optional[str] = None
- published: Optional[bool] = None
- # Schema for returning post data
- class Post(PostBase):
- id: int
- created_at: datetime
- updated_at: Optional[datetime] = None
- owner_id: int
-
- class Config:
- orm_mode = True
复制代码
认证与授权
JWT认证实现
在FastAPI中实现JWT认证非常简单,我们可以使用OAuth2密码流和JWT令牌:
- # app/core/security.py
- from datetime import datetime, timedelta
- from typing import Any, Union, Optional
- from jose import jwt
- from passlib.context import CryptContext
- from app.core.config import settings
- pwd_context = CryptContext(schemes=["bcrypt"], deprecated="auto")
- def create_access_token(
- subject: Union[str, Any], expires_delta: Optional[timedelta] = None
- ) -> str:
- """
- 创建JWT访问令牌
- """
- if expires_delta:
- expire = datetime.utcnow() + expires_delta
- else:
- expire = datetime.utcnow() + timedelta(
- minutes=settings.ACCESS_TOKEN_EXPIRE_MINUTES
- )
- to_encode = {"exp": expire, "sub": str(subject)}
- encoded_jwt = jwt.encode(
- to_encode, settings.SECRET_KEY, algorithm=settings.ALGORITHM
- )
- return encoded_jwt
- def verify_password(plain_password: str, hashed_password: str) -> bool:
- """
- 验证密码
- """
- return pwd_context.verify(plain_password, hashed_password)
- def get_password_hash(password: str) -> str:
- """
- 获取密码哈希值
- """
- return pwd_context.hash(password)
复制代码- # app/api/v1/endpoints/users.py
- from datetime import timedelta
- from typing import Any
- from fastapi import APIRouter, Depends, HTTPException
- from fastapi.security import OAuth2PasswordRequestForm
- from sqlalchemy.orm import Session
- from app import crud, models, schemas
- from app.api import deps
- from app.core import security
- from app.core.config import settings
- router = APIRouter()
- @router.post("/login/access-token", response_model=schemas.Token)
- def login_access_token(
- db: Session = Depends(deps.get_db),
- form_data: OAuth2PasswordRequestForm = Depends()
- ) -> Any:
- """
- OAuth2 compatible token login, get an access token for future requests
- """
- user = crud.user.authenticate(
- db, email=form_data.username, password=form_data.password
- )
- if not user:
- raise HTTPException(status_code=400, detail="Incorrect email or password")
- elif not crud.user.is_active(user):
- raise HTTPException(status_code=400, detail="Inactive user")
- access_token_expires = timedelta(minutes=settings.ACCESS_TOKEN_EXPIRE_MINUTES)
- return {
- "access_token": security.create_access_token(
- user.id, expires_delta=access_token_expires
- ),
- "token_type": "bearer",
- }
复制代码
权限控制
FastAPI的依赖注入系统使得实现权限控制变得非常简单:
- # app/api/deps.py
- from fastapi import Depends, HTTPException, status
- from fastapi.security import OAuth2PasswordBearer
- from jose import jwt
- from pydantic import ValidationError
- from sqlalchemy.orm import Session
- from app import crud, models, schemas
- from app.core import security
- from app.core.config import settings
- from app.db.session import SessionLocal
- oauth2_scheme = OAuth2PasswordBearer(tokenUrl=f"{settings.API_V1_STR}/login/access-token")
- def get_db() -> Session:
- """
- 获取数据库会话
- """
- db = SessionLocal()
- try:
- yield db
- finally:
- db.close()
- def get_current_user(
- db: Session = Depends(get_db), token: str = Depends(oauth2_scheme)
- ) -> models.User:
- """
- 获取当前用户
- """
- try:
- payload = jwt.decode(
- token, settings.SECRET_KEY, algorithms=[settings.ALGORITHM]
- )
- token_data = schemas.TokenPayload(**payload)
- except (jwt.JWTError, ValidationError):
- raise HTTPException(
- status_code=status.HTTP_403_FORBIDDEN,
- detail="Could not validate credentials",
- )
- user = crud.user.get(db, id=token_data.sub)
- if not user:
- raise HTTPException(status_code=404, detail="User not found")
- return user
- def get_current_active_user(
- current_user: models.User = Depends(get_current_user),
- ) -> models.User:
- """
- 获取当前活跃用户
- """
- if not crud.user.is_active(current_user):
- raise HTTPException(status_code=400, detail="Inactive user")
- return current_user
- def get_current_active_superuser(
- current_user: models.User = Depends(get_current_user),
- ) -> models.User:
- """
- 获取当前活跃超级用户
- """
- if not crud.user.is_superuser(current_user):
- raise HTTPException(
- status_code=400, detail="The user doesn't have enough privileges"
- )
- return current_user
复制代码
使用这些依赖项,我们可以轻松地保护API端点:
- # app/api/v1/endpoints/users.py
- @router.get("/me", response_model=schemas.User)
- def read_user_me(
- current_user: models.User = Depends(deps.get_current_active_user),
- ):
- """
- 获取当前用户信息
- """
- return current_user
- @router.post("/", response_model=schemas.User)
- def create_user(
- *,
- db: Session = Depends(deps.get_db),
- user_in: schemas.UserCreate,
- current_user: models.User = Depends(deps.get_current_active_superuser),
- ):
- """
- 创建新用户(仅限超级用户)
- """
- user = crud.user.get_by_email(db, email=user_in.email)
- if user:
- raise HTTPException(
- status_code=400,
- detail="The user with this email already exists in the system.",
- )
- user = crud.user.create(db, obj_in=user_in)
- return user
复制代码
数据库集成
SQLAlchemy配置
FastAPI与SQLAlchemy的集成非常顺畅,我们可以使用异步SQLAlchemy来提高性能:
- # app/db/session.py
- from sqlalchemy import create_engine
- from sqlalchemy.ext.declarative import declarative_base
- from sqlalchemy.orm import sessionmaker
- from app.core.config import settings
- # 创建同步SQLAlchemy引擎
- engine = create_engine(settings.SQLALCHEMY_DATABASE_URI, pool_pre_ping=True)
- SessionLocal = sessionmaker(autocommit=False, autoflush=False, bind=engine)
- # 创建异步SQLAlchemy引擎
- from sqlalchemy.ext.asyncio import AsyncSession, create_async_engine
- async_engine = create_async_engine(settings.SQLALCHEMY_ASYNC_DATABASE_URI)
- AsyncSessionLocal = sessionmaker(
- async_engine, class_=AsyncSession, expire_on_commit=False
- )
- # 基础模型类
- Base = declarative_base()
- # 获取数据库会话的依赖项
- def get_db() -> Session:
- db = SessionLocal()
- try:
- yield db
- finally:
- db.close()
- # 获取异步数据库会话的依赖项
- async def get_async_db() -> AsyncSession:
- async with AsyncSessionLocal() as session:
- try:
- yield session
- finally:
- await session.close()
复制代码
CRUD操作
使用SQLAlchemy ORM进行CRUD操作:
- # app/crud/base.py
- from typing import Any, Dict, Generic, List, Optional, Type, TypeVar, Union
- from fastapi.encoders import jsonable_encoder
- from pydantic import BaseModel
- from sqlalchemy.orm import Session
- from app.db.base_class import Base
- ModelType = TypeVar("ModelType", bound=Base)
- CreateSchemaType = TypeVar("CreateSchemaType", bound=BaseModel)
- UpdateSchemaType = TypeVar("UpdateSchemaType", bound=BaseModel)
- class CRUDBase(Generic[ModelType, CreateSchemaType, UpdateSchemaType]):
- def __init__(self, model: Type[ModelType]):
- """
- CRUD object with default methods to Create, Read, Update, Delete (CRUD).
- """
- self.model = model
- def get(self, db: Session, id: Any) -> Optional[ModelType]:
- return db.query(self.model).filter(self.model.id == id).first()
- def get_multi(
- self, db: Session, *, skip: int = 0, limit: int = 100
- ) -> List[ModelType]:
- return db.query(self.model).offset(skip).limit(limit).all()
- def create(self, db: Session, *, obj_in: CreateSchemaType) -> ModelType:
- obj_in_data = jsonable_encoder(obj_in)
- db_obj = self.model(**obj_in_data) # type: ignore
- db.add(db_obj)
- db.commit()
- db.refresh(db_obj)
- return db_obj
- def update(
- self,
- db: Session,
- *,
- db_obj: ModelType,
- obj_in: Union[UpdateSchemaType, Dict[str, Any]]
- ) -> ModelType:
- obj_data = jsonable_encoder(db_obj)
- if isinstance(obj_in, dict):
- update_data = obj_in
- else:
- update_data = obj_in.dict(exclude_unset=True)
- for field in obj_data:
- if field in update_data:
- setattr(db_obj, field, update_data[field])
- db.add(db_obj)
- db.commit()
- db.refresh(db_obj)
- return db_obj
- def remove(self, db: Session, *, id: int) -> ModelType:
- obj = db.query(self.model).get(id)
- db.delete(obj)
- db.commit()
- return obj
复制代码- # app/crud/post.py
- from typing import List, Optional
- from sqlalchemy.orm import Session
- from sqlalchemy import or_
- from app.crud.base import CRUDBase
- from app.models.post import Post
- from app.schemas.post import PostCreate, PostUpdate
- class CRUDPost(CRUDBase[Post, PostCreate, PostUpdate]):
- def create_with_owner(
- self, db: Session, *, obj_in: PostCreate, owner_id: int
- ) -> Post:
- obj_in_data = obj_in.dict()
- db_obj = Post(**obj_in_data, owner_id=owner_id)
- db.add(db_obj)
- db.commit()
- db.refresh(db_obj)
- return db_obj
-
- def get_multi_by_owner(
- self, db: Session, *, owner_id: int, skip: int = 0, limit: int = 100
- ) -> List[Post]:
- return (
- db.query(self.model)
- .filter(Post.owner_id == owner_id)
- .offset(skip)
- .limit(limit)
- .all()
- )
-
- def search(
- self, db: Session, *, search: str, skip: int = 0, limit: int = 100
- ) -> List[Post]:
- return (
- db.query(self.model)
- .filter(
- or_(
- Post.title.ilike(f"%{search}%"),
- Post.content.ilike(f"%{search}%")
- )
- )
- .offset(skip)
- .limit(limit)
- .all()
- )
- post = CRUDPost(Post)
复制代码
异步数据库操作
FastAPI支持异步操作,我们可以使用SQLAlchemy的异步API来提高性能:
- # app/crud/async_post.py
- from typing import List, Optional
- from sqlalchemy.ext.asyncio import AsyncSession
- from sqlalchemy import select, or_
- from app.models.post import Post
- from app.schemas.post import PostCreate, PostUpdate
- async def async_get_post(db: AsyncSession, post_id: int) -> Optional[Post]:
- result = await db.execute(select(Post).where(Post.id == post_id))
- return result.scalar_one_or_none()
- async def async_get_posts(
- db: AsyncSession, skip: int = 0, limit: int = 100
- ) -> List[Post]:
- result = await db.execute(select(Post).offset(skip).limit(limit))
- return result.scalars().all()
- async def async_create_post(
- db: AsyncSession, *, post_in: PostCreate, owner_id: int
- ) -> Post:
- db_post = Post(**post_in.dict(), owner_id=owner_id)
- db.add(db_post)
- await db.commit()
- await db.refresh(db_post)
- return db_post
- async def async_search_posts(
- db: AsyncSession, *, search: str, skip: int = 0, limit: int = 100
- ) -> List[Post]:
- result = await db.execute(
- select(Post)
- .where(
- or_(
- Post.title.ilike(f"%{search}%"),
- Post.content.ilike(f"%{search}%")
- )
- )
- .offset(skip)
- .limit(limit)
- )
- return result.scalars().all()
复制代码
缓存策略
Redis缓存集成
缓存是提高API性能的重要手段,我们可以使用Redis来实现缓存:
- # app/core/redis.py
- import redis
- import json
- from app.core.config import settings
- # 创建Redis连接
- redis_client = redis.Redis(
- host=settings.REDIS_HOST,
- port=settings.REDIS_PORT,
- db=settings.REDIS_DB,
- password=settings.REDIS_PASSWORD,
- decode_responses=True
- )
- def get_from_cache(key: str):
- """
- 从缓存中获取数据
- """
- try:
- data = redis_client.get(key)
- if data:
- return json.loads(data)
- return None
- except Exception as e:
- print(f"Redis GET error: {e}")
- return None
- def set_to_cache(key: str, value: dict, expire: int = 3600):
- """
- 设置缓存数据
- """
- try:
- redis_client.setex(key, expire, json.dumps(value))
- except Exception as e:
- print(f"Redis SET error: {e}")
- def delete_from_cache(key: str):
- """
- 删除缓存数据
- """
- try:
- redis_client.delete(key)
- except Exception as e:
- print(f"Redis DELETE error: {e}")
复制代码
缓存装饰器
我们可以创建一个缓存装饰器,方便在API端点中使用缓存:
- # app/utils/cache.py
- import json
- from functools import wraps
- from fastapi import Request, Response
- from app.core.redis import get_from_cache, set_to_cache, delete_from_cache
- def cache_response(expire: int = 3600):
- """
- 缓存响应装饰器
- """
- def decorator(func):
- @wraps(func)
- async def wrapper(*args, **kwargs):
- # 尝试从kwargs中获取request对象
- request = None
- for arg in args:
- if isinstance(arg, Request):
- request = arg
- break
-
- if request:
- # 生成缓存键
- cache_key = f"{request.url.path}:{json.dumps(request.query_params)}"
-
- # 尝试从缓存获取数据
- cached_data = get_from_cache(cache_key)
- if cached_data:
- return cached_data
-
- # 执行原始函数
- response = await func(*args, **kwargs)
-
- # 将响应存入缓存
- if isinstance(response, Response):
- set_to_cache(cache_key, response.body.decode(), expire)
- else:
- set_to_cache(cache_key, response, expire)
-
- return response
- else:
- # 如果没有request对象,直接执行原始函数
- return await func(*args, **kwargs)
-
- return wrapper
- return decorator
- def invalidate_cache(pattern: str):
- """
- 缓存失效装饰器
- """
- def decorator(func):
- @wraps(func)
- async def wrapper(*args, **kwargs):
- # 执行原始函数
- result = await func(*args, **kwargs)
-
- # 使匹配模式的缓存失效
- try:
- from app.core.redis import redis_client
- for key in redis_client.scan_iter(match=pattern):
- redis_client.delete(key)
- except Exception as e:
- print(f"Cache invalidation error: {e}")
-
- return result
-
- return wrapper
- return decorator
复制代码
在API中使用缓存
现在我们可以在API端点中使用缓存:
- # app/api/v1/endpoints/posts.py
- from fastapi import APIRouter, Depends, HTTPException, Query, Request
- from sqlalchemy.orm import Session
- from app import crud, models, schemas
- from app.api import deps
- from app.utils.cache import cache_response, invalidate_cache
- router = APIRouter()
- @router.get("/", response_model=List[schemas.Post])
- @cache_response(expire=1800) # 缓存30分钟
- def read_posts(
- request: Request,
- db: Session = Depends(deps.get_db),
- skip: int = 0,
- limit: int = 100,
- search: str = None,
- ):
- """
- 检索文章列表,支持缓存
- """
- if search:
- posts = crud.post.search(db, search=search, skip=skip, limit=limit)
- else:
- posts = crud.post.get_multi(db, skip=skip, limit=limit)
- return posts
- @router.post("/", response_model=schemas.Post)
- @invalidate_cache(pattern="/api/v1/posts*") # 使文章列表缓存失效
- def create_post(
- *,
- db: Session = Depends(deps.get_db),
- post_in: schemas.PostCreate,
- current_user: models.User = Depends(deps.get_current_active_user),
- ):
- """
- 创建新文章,并使相关缓存失效
- """
- post = crud.post.create_with_owner(db=db, obj_in=post_in, owner_id=current_user.id)
- return post
复制代码
异步处理
异步任务处理
FastAPI原生支持异步操作,我们可以使用它来处理长时间运行的任务:
- # app/utils/tasks.py
- import asyncio
- from typing import Dict, Any
- from fastapi import BackgroundTasks
- from app.core.redis import redis_client
- import json
- async def process_data(data: Dict[str, Any]):
- """
- 模拟一个长时间运行的数据处理任务
- """
- # 模拟耗时操作
- await asyncio.sleep(10)
-
- # 处理数据
- processed_data = {k: v.upper() if isinstance(v, str) else v for k, v in data.items()}
-
- # 将结果存储到Redis
- task_id = data.get("task_id")
- if task_id:
- redis_client.setex(f"task_result:{task_id}", 3600, json.dumps(processed_data))
-
- return processed_data
- def run_task_in_background(background_tasks: BackgroundTasks, data: Dict[str, Any]):
- """
- 在后台运行任务
- """
- background_tasks.add_task(process_data, data)
复制代码- # app/api/v1/endpoints/tasks.py
- from fastapi import APIRouter, BackgroundTasks, HTTPException
- from app.utils.tasks import run_task_in_background
- from app.core.redis import redis_client
- import json
- import uuid
- router = APIRouter()
- @router.post("/process")
- async def create_processing_task(background_tasks: BackgroundTasks):
- """
- 创建一个数据处理任务
- """
- task_id = str(uuid.uuid4())
- data = {"task_id": task_id, "name": "sample data", "value": 42}
-
- # 在后台运行任务
- run_task_in_background(background_tasks, data)
-
- return {"task_id": task_id, "message": "Task started"}
- @router.get("/status/{task_id}")
- async def get_task_status(task_id: str):
- """
- 获取任务状态和结果
- """
- result = redis_client.get(f"task_result:{task_id}")
- if result:
- return {"task_id": task_id, "status": "completed", "result": json.loads(result)}
- else:
- return {"task_id": task_id, "status": "processing"}
复制代码
使用Celery进行分布式任务处理
对于更复杂的异步任务处理,我们可以集成Celery:
- # app/core/celery_app.py
- from celery import Celery
- from app.core.config import settings
- # 创建Celery实例
- celery_app = Celery(
- "blog_system",
- broker=settings.CELERY_BROKER_URL,
- backend=settings.CELERY_RESULT_BACKEND,
- include=["app.tasks"]
- )
- # Celery配置
- celery_app.conf.update(
- task_serializer="json",
- result_serializer="json",
- accept_content=["json"],
- timezone="UTC",
- enable_utc=True,
- task_track_started=True,
- task_time_limit=30 * 60, # 30分钟超时
- task_soft_time_limit=25 * 60, # 25分钟软超时
- worker_prefetch_multiplier=1, # 每个worker一次只取一个任务
- )
复制代码- # app/tasks.py
- from app.core.celery_app import celery_app
- from app.core.redis import redis_client
- import json
- import time
- @celery_app.task(bind=True)
- def process_data_task(self, data):
- """
- 使用Celery处理数据的任务
- """
- try:
- # 更新任务状态
- self.update_state(
- state="PROGRESS",
- meta={"current": 1, "total": 3, "status": "Starting data processing"}
- )
-
- # 模拟数据处理步骤1
- time.sleep(5)
- processed_data = {k: v.upper() if isinstance(v, str) else v for k, v in data.items()}
-
- # 更新任务状态
- self.update_state(
- state="PROGRESS",
- meta={"current": 2, "total": 3, "status": "Processing data"}
- )
-
- # 模拟数据处理步骤2
- time.sleep(5)
- processed_data["processed_at"] = time.time()
-
- # 更新任务状态
- self.update_state(
- state="PROGRESS",
- meta={"current": 3, "total": 3, "status": "Saving results"}
- )
-
- # 模拟数据处理步骤3
- time.sleep(5)
- task_id = data.get("task_id")
- if task_id:
- redis_client.setex(f"celery_task_result:{task_id}", 3600, json.dumps(processed_data))
-
- # 返回结果
- return {"status": "completed", "result": processed_data}
-
- except Exception as e:
- # 更新任务状态为失败
- self.update_state(
- state="FAILURE",
- meta={"current": 3, "total": 3, "status": f"Error: {str(e)}"}
- )
- raise e
复制代码- # app/api/v1/endpoints/celery_tasks.py
- from fastapi import APIRouter, HTTPException
- from app.tasks import process_data_task
- from app.core.redis import redis_client
- import json
- import uuid
- router = APIRouter()
- @router.post("/process")
- async def create_celery_task():
- """
- 创建一个Celery数据处理任务
- """
- task_id = str(uuid.uuid4())
- data = {"task_id": task_id, "name": "sample data", "value": 42}
-
- # 启动Celery任务
- task = process_data_task.delay(data)
-
- return {"task_id": task_id, "celery_task_id": task.id, "message": "Task started"}
- @router.get("/status/{task_id}")
- async def get_celery_task_status(task_id: str):
- """
- 获取Celery任务状态和结果
- """
- # 从Redis获取结果
- result = redis_client.get(f"celery_task_result:{task_id}")
- if result:
- return {"task_id": task_id, "status": "completed", "result": json.loads(result)}
-
- # 如果Redis中没有结果,返回任务处理中
- return {"task_id": task_id, "status": "processing"}
复制代码
性能优化
数据库优化
数据库查询优化是提高API性能的关键:
- # app/crud/optimized_post.py
- from typing import List, Optional
- from sqlalchemy.orm import Session, joinedload
- from sqlalchemy import func, or_
- from app.models.post import Post
- from app.models.user import User
- from app.schemas.post import PostCreate, PostUpdate
- def get_post_with_author(db: Session, post_id: int) -> Optional[Post]:
- """
- 获取文章及其作者信息(使用joinedload优化查询)
- """
- return (
- db.query(Post)
- .options(joinedload(Post.owner))
- .filter(Post.id == post_id)
- .first()
- )
- def get_posts_with_pagination(
- db: Session, skip: int = 0, limit: int = 100
- ) -> tuple[List[Post], int]:
- """
- 获取文章列表及总数(用于分页)
- """
- # 使用一个查询获取总数
- total = db.query(func.count(Post.id)).scalar()
-
- # 使用另一个查询获取分页数据
- posts = (
- db.query(Post)
- .options(joinedload(Post.owner))
- .offset(skip)
- .limit(limit)
- .all()
- )
-
- return posts, total
- def get_popular_posts(db: Session, limit: int = 10) -> List[Post]:
- """
- 获取热门文章(基于评论数量)
- """
- return (
- db.query(Post)
- .join(Post.comments)
- .group_by(Post.id)
- .order_by(func.count(Post.comments).desc())
- .limit(limit)
- .all()
- )
- def search_posts_optimized(
- db: Session, *, search: str, skip: int = 0, limit: int = 100
- ) -> List[Post]:
- """
- 优化的文章搜索(使用全文索引)
- """
- return (
- db.query(Post)
- .filter(
- or_(
- Post.title.ilike(f"%{search}%"),
- Post.content.ilike(f"%{search}%")
- )
- )
- .options(joinedload(Post.owner))
- .offset(skip)
- .limit(limit)
- .all()
- )
复制代码
响应压缩
FastAPI支持响应压缩,可以显著减少传输数据量:
- # app/main.py
- from fastapi import FastAPI
- from fastapi.middleware.gzip import GZipMiddleware
- from app.api.v1.api import api_router
- from app.core.config import settings
- app = FastAPI(
- title=settings.PROJECT_NAME,
- openapi_url=f"{settings.API_V1_STR}/openapi.json"
- )
- # 添加GZip中间件
- app.add_middleware(GZipMiddleware, minimum_size=1000)
- app.include_router(api_router, prefix=settings.API_V1_STR)
复制代码
异步端点
使用异步端点可以提高并发性能:
- # app/api/v1/endpoints/async_posts.py
- from typing import List, Optional
- from fastapi import APIRouter, Depends, HTTPException, Query
- from sqlalchemy.ext.asyncio import AsyncSession
- from sqlalchemy import select, or_, func
- from app.models.post import Post
- from app.models.user import User
- from app.schemas.post import Post, PostCreate
- from app.api.deps import get_async_db
- from app.crud.async_post import async_create_post
- router = APIRouter()
- @router.get("/", response_model=List[Post])
- async def read_posts(
- db: AsyncSession = Depends(get_async_db),
- skip: int = 0,
- limit: int = 100,
- search: Optional[str] = Query(None, min_length=3),
- ):
- """
- 异步获取文章列表
- """
- if search:
- query = (
- select(Post)
- .where(
- or_(
- Post.title.ilike(f"%{search}%"),
- Post.content.ilike(f"%{search}%")
- )
- )
- .offset(skip)
- .limit(limit)
- )
- else:
- query = select(Post).offset(skip).limit(limit)
-
- result = await db.execute(query)
- return result.scalars().all()
- @router.post("/", response_model=Post)
- async def create_post(
- *,
- db: AsyncSession = Depends(get_async_db),
- post_in: PostCreate,
- # 假设我们有一个异步的get_current_active_user函数
- current_user: User = Depends(get_current_active_user),
- ):
- """
- 异步创建新文章
- """
- return await async_create_post(db=db, post_in=post_in, owner_id=current_user.id)
复制代码
测试策略
单元测试
使用pytest进行单元测试:
- # tests/test_crud.py
- import pytest
- from sqlalchemy import create_engine
- from sqlalchemy.orm import sessionmaker
- from app.db.base_class import Base
- from app.crud.post import post
- from app.models.post import Post
- from app.schemas.post import PostCreate, PostUpdate
- from app.core.security import get_password_hash
- # 测试数据库
- SQLALCHEMY_DATABASE_URL = "sqlite:///./test.db"
- engine = create_engine(SQLALCHEMY_DATABASE_URL, connect_args={"check_same_thread": False})
- TestingSessionLocal = sessionmaker(autocommit=False, autoflush=False, bind=engine)
- @pytest.fixture
- def db_session():
- Base.metadata.create_all(bind=engine)
- db = TestingSessionLocal()
- try:
- yield db
- finally:
- db.close()
- Base.metadata.drop_all(bind=engine)
- def test_create_post(db_session):
- title = "Test Post"
- content = "This is a test post"
- post_in = PostCreate(title=title, content=content)
-
- # 创建一个测试用户
- from app.models.user import User
- user = User(
- email="test@example.com",
- hashed_password=get_password_hash("testpassword"),
- full_name="Test User",
- is_active=True,
- )
- db_session.add(user)
- db_session.commit()
- db_session.refresh(user)
-
- # 创建文章
- created_post = post.create_with_owner(db=db_session, obj_in=post_in, owner_id=user.id)
-
- assert created_post.title == title
- assert created_post.content == content
- assert created_post.owner_id == user.id
- def test_get_post(db_session):
- title = "Test Post"
- content = "This is a test post"
- post_in = PostCreate(title=title, content=content)
-
- # 创建一个测试用户
- from app.models.user import User
- user = User(
- email="test@example.com",
- hashed_password=get_password_hash("testpassword"),
- full_name="Test User",
- is_active=True,
- )
- db_session.add(user)
- db_session.commit()
- db_session.refresh(user)
-
- # 创建文章
- created_post = post.create_with_owner(db=db_session, obj_in=post_in, owner_id=user.id)
-
- # 获取文章
- retrieved_post = post.get(db=db_session, id=created_post.id)
-
- assert retrieved_post
- assert retrieved_post.id == created_post.id
- assert retrieved_post.title == title
- assert retrieved_post.content == content
- def test_update_post(db_session):
- title = "Test Post"
- content = "This is a test post"
- post_in = PostCreate(title=title, content=content)
-
- # 创建一个测试用户
- from app.models.user import User
- user = User(
- email="test@example.com",
- hashed_password=get_password_hash("testpassword"),
- full_name="Test User",
- is_active=True,
- )
- db_session.add(user)
- db_session.commit()
- db_session.refresh(user)
-
- # 创建文章
- created_post = post.create_with_owner(db=db_session, obj_in=post_in, owner_id=user.id)
-
- # 更新文章
- new_title = "Updated Test Post"
- post_update = PostUpdate(title=new_title)
- updated_post = post.update(db=db_session, db_obj=created_post, obj_in=post_update)
-
- assert updated_post.title == new_title
- assert updated_post.content == content # 内容应保持不变
复制代码
API端点测试
使用httpx进行API端点测试:
- # tests/test_api.py
- import pytest
- from fastapi.testclient import TestClient
- from app.main import app
- from app.core.config import settings
- from app.core.security import create_access_token
- from app.models.user import User
- from app.schemas.user import UserCreate
- client = TestClient(app)
- @pytest.fixture
- def test_user():
- return {
- "email": "test@example.com",
- "password": "testpassword",
- "full_name": "Test User"
- }
- @pytest.fixture
- def user_token_headers(test_user):
- # 创建用户
- response = client.post(
- f"{settings.API_V1_STR}/users/", json=test_user
- )
- assert response.status_code == 200
-
- # 获取访问令牌
- login_data = {
- "username": test_user["email"],
- "password": test_user["password"]
- }
- response = client.post(f"{settings.API_V1_STR}/login/access-token", data=login_data)
- assert response.status_code == 200
- token = response.json()["access_token"]
-
- return {"Authorization": f"Bearer {token}"}
- def test_create_post(user_token_headers):
- post_data = {
- "title": "Test Post",
- "content": "This is a test post",
- "published": False
- }
- response = client.post(
- f"{settings.API_V1_STR}/posts/",
- headers=user_token_headers,
- json=post_data
- )
- assert response.status_code == 200
- content = response.json()
- assert content["title"] == post_data["title"]
- assert content["content"] == post_data["content"]
- assert content["published"] == post_data["published"]
- assert "id" in content
- assert "owner_id" in content
- def test_read_posts(user_token_headers):
- # 先创建一个文章
- post_data = {
- "title": "Test Post for Reading",
- "content": "This is a test post for reading",
- "published": True
- }
- client.post(
- f"{settings.API_V1_STR}/posts/",
- headers=user_token_headers,
- json=post_data
- )
-
- # 读取文章列表
- response = client.get(
- f"{settings.API_V1_STR}/posts/",
- headers=user_token_headers
- )
- assert response.status_code == 200
- content = response.json()
- assert len(content) >= 1
- assert content[0]["title"] == post_data["title"]
- def test_update_post(user_token_headers):
- # 先创建一个文章
- post_data = {
- "title": "Test Post for Update",
- "content": "This is a test post for update",
- "published": False
- }
- response = client.post(
- f"{settings.API_V1_STR}/posts/",
- headers=user_token_headers,
- json=post_data
- )
- post_id = response.json()["id"]
-
- # 更新文章
- update_data = {
- "title": "Updated Test Post",
- "published": True
- }
- response = client.put(
- f"{settings.API_V1_STR}/posts/{post_id}",
- headers=user_token_headers,
- json=update_data
- )
- assert response.status_code == 200
- content = response.json()
- assert content["title"] == update_data["title"]
- assert content["content"] == post_data["content"] # 内容应保持不变
- assert content["published"] == update_data["published"]
- def test_delete_post(user_token_headers):
- # 先创建一个文章
- post_data = {
- "title": "Test Post for Deletion",
- "content": "This is a test post for deletion",
- "published": False
- }
- response = client.post(
- f"{settings.API_V1_STR}/posts/",
- headers=user_token_headers,
- json=post_data
- )
- post_id = response.json()["id"]
-
- # 删除文章
- response = client.delete(
- f"{settings.API_V1_STR}/posts/{post_id}",
- headers=user_token_headers
- )
- assert response.status_code == 200
-
- # 验证文章已被删除
- response = client.get(
- f"{settings.API_V1_STR}/posts/{post_id}",
- headers=user_token_headers
- )
- assert response.status_code == 404
复制代码
部署与监控
Docker部署
使用Docker容器化部署FastAPI应用:
- # Dockerfile
- FROM python:3.9-slim
- # 设置工作目录
- WORKDIR /app
- # 设置环境变量
- ENV PYTHONDONTWRITEBYTECODE 1
- ENV PYTHONUNBUFFERED 1
- # 安装系统依赖
- RUN apt-get update \
- && apt-get install -y --no-install-recommends \
- gcc \
- postgresql-client \
- && rm -rf /var/lib/apt/lists/*
- # 安装Python依赖
- COPY requirements.txt .
- RUN pip install --no-cache-dir -r requirements.txt
- # 复制项目代码
- COPY . .
- # 收集静态文件(如果有)
- # RUN python manage.py collectstatic --noinput
- # 创建非root用户
- RUN adduser --disabled-password --gecos '' appuser
- RUN chown -R appuser:appuser /app
- USER appuser
- # 暴露端口
- EXPOSE 8000
- # 启动命令
- CMD ["uvicorn", "app.main:app", "--host", "0.0.0.0", "--port", "8000"]
复制代码- # docker-compose.yml
- version: '3.8'
- services:
- web:
- build: .
- ports:
- - "8000:8000"
- depends_on:
- - db
- - redis
- environment:
- - DATABASE_URL=postgresql://fastapi_user:fastapi_password@db:5432/blog_db
- - REDIS_URL=redis://redis:6379/0
- volumes:
- - .:/app
- command: uvicorn app.main:app --host 0.0.0.0 --port 8000 --reload
- db:
- image: postgres:13
- environment:
- - POSTGRES_USER=fastapi_user
- - POSTGRES_PASSWORD=fastapi_password
- - POSTGRES_DB=blog_db
- volumes:
- - postgres_data:/var/lib/postgresql/data/
- ports:
- - "5432:5432"
- redis:
- image: redis:6-alpine
- ports:
- - "6379:6379"
- celery:
- build: .
- command: celery -A app.core.celery_app worker --loglevel=info
- volumes:
- - .:/app
- depends_on:
- - db
- - redis
- environment:
- - DATABASE_URL=postgresql://fastapi_user:fastapi_password@db:5432/blog_db
- - REDIS_URL=redis://redis:6379/0
- - CELERY_BROKER_URL=redis://redis:6379/0
- - CELERY_RESULT_BACKEND=redis://redis:6379/0
- flower:
- build: .
- command: celery -A app.core.celery_app flower --port=5555
- ports:
- - "5555:5555"
- depends_on:
- - redis
- environment:
- - CELERY_BROKER_URL=redis://redis:6379/0
- - CELERY_RESULT_BACKEND=redis://redis:6379/0
- volumes:
- postgres_data:
复制代码
日志和监控
实现日志记录和监控:
- # app/core/logging.py
- import logging
- import sys
- from pathlib import Path
- from app.core.config import settings
- # 创建日志目录
- log_dir = Path("logs")
- log_dir.mkdir(exist_ok=True)
- # 配置日志
- def setup_logging():
- """
- 配置应用日志
- """
- # 创建日志格式化器
- formatter = logging.Formatter(
- "%(asctime)s - %(name)s - %(levelname)s - %(message)s"
- )
-
- # 创建文件处理器
- file_handler = logging.FileHandler("logs/app.log")
- file_handler.setFormatter(formatter)
- file_handler.setLevel(logging.INFO)
-
- # 创建控制台处理器
- console_handler = logging.StreamHandler(sys.stdout)
- console_handler.setFormatter(formatter)
- console_handler.setLevel(logging.INFO if not settings.DEBUG else logging.DEBUG)
-
- # 配置根日志记录器
- root_logger = logging.getLogger()
- root_logger.setLevel(logging.INFO if not settings.DEBUG else logging.DEBUG)
- root_logger.addHandler(file_handler)
- root_logger.addHandler(console_handler)
-
- # 配置第三方库日志级别
- logging.getLogger("uvicorn.access").setLevel(logging.WARNING)
- logging.getLogger("sqlalchemy.engine").setLevel(logging.WARNING)
-
- return root_logger
复制代码- # app/main.py
- from fastapi import FastAPI, Request, Response
- from fastapi.middleware.gzip import GZipMiddleware
- from fastapi.middleware.httpsredirect import HTTPSRedirectMiddleware
- import time
- import logging
- from app.api.v1.api import api_router
- from app.core.config import settings
- from app.core.logging import setup_logging
- # 设置日志
- logger = setup_logging()
- app = FastAPI(
- title=settings.PROJECT_NAME,
- openapi_url=f"{settings.API_V1_STR}/openapi.json"
- )
- # 添加中间件
- if settings.ENABLE_HTTPS:
- app.add_middleware(HTTPSRedirectMiddleware)
- app.add_middleware(GZipMiddleware, minimum_size=1000)
- # 请求日志中间件
- @app.middleware("http")
- async def log_requests(request: Request, call_next):
- start_time = time.time()
-
- # 记录请求信息
- logger.info(f"Request started: {request.method} {request.url}")
-
- response = await call_next(request)
-
- # 计算处理时间
- process_time = time.time() - start_time
-
- # 记录响应信息
- logger.info(
- f"Request completed: {request.method} {request.url} - Status: {response.status_code} - Time: {process_time:.4f}s"
- )
-
- # 添加处理时间到响应头
- response.headers["X-Process-Time"] = str(process_time)
-
- return response
- app.include_router(api_router, prefix=settings.API_V1_STR)
- @app.on_event("startup")
- async def startup_event():
- logger.info("Application startup...")
- @app.on_event("shutdown")
- async def shutdown_event():
- logger.info("Application shutdown...")
复制代码
性能监控
使用Prometheus和Grafana进行性能监控:
- # app/core/metrics.py
- from prometheus_client import Counter, Histogram, Gauge, generate_latest
- from fastapi import Response
- import time
- # 定义指标
- REQUEST_COUNT = Counter(
- 'http_requests_total',
- 'Total HTTP Requests',
- ['method', 'endpoint', 'status_code']
- )
- REQUEST_DURATION = Histogram(
- 'http_request_duration_seconds',
- 'HTTP Request Duration',
- ['method', 'endpoint']
- )
- ACTIVE_USERS = Gauge(
- 'active_users',
- 'Number of active users'
- )
- POST_COUNT = Gauge(
- 'post_count',
- 'Total number of posts'
- )
- # Prometheus端点
- async def metrics_endpoint():
- return Response(generate_latest(), media_type="text/plain")
- # 指标装饰器
- def monitor_requests(endpoint_name: str):
- def decorator(func):
- async def wrapper(*args, **kwargs):
- start_time = time.time()
-
- # 获取请求对象
- request = None
- for arg in args:
- if hasattr(arg, 'method'):
- request = arg
- break
-
- # 执行原始函数
- response = await func(*args, **kwargs)
-
- # 计算处理时间
- process_time = time.time() - start_time
-
- # 更新指标
- if request:
- REQUEST_COUNT.labels(
- method=request.method,
- endpoint=endpoint_name,
- status_code=response.status_code
- ).inc()
-
- REQUEST_DURATION.labels(
- method=request.method,
- endpoint=endpoint_name
- ).observe(process_time)
-
- return response
-
- return wrapper
- return decorator
复制代码- # app/main.py
- from fastapi import FastAPI
- from app.api.v1.api import api_router
- from app.core.config import settings
- from app.core.metrics import metrics_endpoint
- app = FastAPI(
- title=settings.PROJECT_NAME,
- openapi_url=f"{settings.API_V1_STR}/openapi.json"
- )
- app.include_router(api_router, prefix=settings.API_V1_STR)
- # 添加Prometheus指标端点
- app.add_route("/metrics", metrics_endpoint)
复制代码
总结与最佳实践
通过这个完整的FastAPI实战项目,我们展示了从API设计到系统优化的全过程。以下是一些关键的最佳实践总结:
1. 项目结构设计
• 使用模块化结构,按功能划分代码
• 遵循依赖注入原则,提高代码可测试性
• 使用类型提示,提高代码可读性和IDE支持
2. API设计
• 遵循RESTful设计原则
• 使用Pydantic模型进行数据验证和序列化
• 提供清晰的错误信息和适当的HTTP状态码
3. 认证与授权
• 使用JWT进行无状态认证
• 实现基于角色的访问控制
• 使用FastAPI的依赖注入系统简化权限检查
4. 数据库集成
• 使用SQLAlchemy ORM进行数据库操作
• 实现异步数据库操作提高性能
• 使用数据库迁移工具管理架构变更
5. 缓存策略
• 使用Redis缓存常用数据
• 实现缓存失效机制保持数据一致性
• 使用装饰器简化缓存逻辑
6. 异步处理
• 利用FastAPI的异步特性提高并发性能
• 使用Celery处理后台任务
• 实现任务状态跟踪和结果查询
7. 性能优化
• 优化数据库查询,减少N+1问题
• 使用响应压缩减少传输数据量
• 实现异步端点提高吞吐量
8. 测试策略
• 编写单元测试确保代码质量
• 使用TestClient进行API端点测试
• 实现测试夹具简化测试设置
9. 部署与监控
• 使用Docker容器化应用
• 实现日志记录和监控
• 使用Prometheus和Grafana进行性能监控
通过遵循这些最佳实践,你可以构建出高性能、可维护、可扩展的FastAPI应用,满足现代Web开发的需求。FastAPI的强大功能和简洁设计使其成为构建API的理想选择,无论是小型项目还是大型企业应用,都能胜任。
希望这个实战项目案例能帮助你更好地理解和应用FastAPI,掌握现代Web开发的精髓。
版权声明
1、转载或引用本网站内容(FastAPI实战项目案例分享从API设计到系统优化涵盖认证授权数据库集成缓存策略异步处理等关键技术点助你掌握现代Web开发精髓)须注明原网址及作者(威震华夏关云长),并标明本网站网址(https://pixtech.cc/)。
2、对于不当转载或引用本网站内容而引起的民事纷争、行政处理或其他损失,本网站不承担责任。
3、对不遵守本声明或其他违法、恶意使用本网站内容者,本网站保留追究其法律责任的权利。
本文地址: https://pixtech.cc/thread-41737-1-1.html
|
|