FastAPI 事件驱动参考实现
这是一套在 FastAPI 里实践 DDD/Clean/Hexagonal 的最小可用模板,顺手支持事件驱动。 目标只有一个:把业务装进“可替换的边界”,把基础设施关在外侧。
好处是,换数据库、换消息队列、换日志方案时,Domain 与 Use Case 无感。 这正是 Hexagonal/Clean 所推崇的“内外分离、依赖内向”。
为什么是这套结构?
Hexagonal(端口/适配器)让我们先定义“要什么能力”(端口),再决定“用什么实现”(适配器)。 因此,仓库、事件总线、外部网关统统是“可热插拔”的。
Clean Architecture提醒我们:框架只是细节,业务规则应该不依赖任何 Web/DB/MQ; 控制反转,依赖指向内层。
FastAPI自带“够用的依赖机制”(Depends / yield),而当项目复杂到需要集中装配和覆盖测试时,
dependency-injector 负责把“端口 → 适配器”的绑定统一放进容器。
目录与分层
app/
main.py # 组合根:创建 FastAPI,装配容器,挂路由
containers.py # DI 容器(dependency-injector)
config.py # 配置来源(env/file)
core/
events.py # Event 接口 & EventBus 端口
exceptions.py
message_bus.py # 可选:应用层消息总线封装
domain/
user/
entities.py # 聚合/实体
value_objects.py
events.py # 领域事件(纯 Python 数据)
repositories.py # 仓库端口(接口)
services.py # 领域服务(纯业务规则)
application/
user/
dto.py # 输入/输出 DTO
commands.py # 用例请求
queries.py
handlers.py # 用例实现:拿端口,编排事务与事件
infrastructure/
db/
session.py # 连接与会话工厂(SQLAlchemy 等)
models.py # ORM 模型(别渗入 Domain)
user_repository.py # 仓库适配器(端口实现)
events/
in_memory_bus.py # 事件总线适配器(示例)
dispatcher.py # 统一注册应用事件处理器
messaging/
publisher.py # 对外发布器(Kafka/Redis/NATS 等)
subscriber.py # 外部消息→内部事件
interfaces/
api/
deps.py # API 层通用依赖
v1/
router.py
user_endpoints.py # 只调 Use Case,不写业务
consumers/
worker.py # 消费者进程,和 Web 同容器装配
- Domain:只放业务语义;不引用框架、ORM、HTTP。
- Application:实现用例;只依赖端口(仓库、事件总线);编排事务、发布事件。
- Infrastructure:所有“细节实现”,可以随时替换(例如将 InMemoryBus 换成 Kafka)。
- Interfaces:入站适配器(HTTP、CLI、Consumer)。
这正是 Hexagonal 的Port → Adapter做法;可见好处:替换实现时只改容器绑定。
一条小流程
下面以 注册用户 → 发欢迎邮件 为例,跑通这条流程。
1)Domain:聚合吐出领域事件
# app/domain/user/entities.py
from dataclasses import dataclass, field
from typing import List
from .events import UserRegistered
@dataclass
class User:
id: str
email: str
_events: List[object] = field(default_factory=list, init=False, repr=False)
@staticmethod
def register(id: str, email: str) -> "User":
u = User(id=id, email=email)
u._events.append(UserRegistered(user_id=id, email=email))
return u
def pull_events(self) -> list[object]:
ev, self._events = self._events[:], []
return ev
2)Application:用例实现端口编排 + 事件派发
# app/application/user/handlers.py
from app.application.user.commands import RegisterUserCommand
from app.domain.user.entities import User
from app.domain.user.repositories import UserRepository
from app.core.events import EventBus, Event
class RegisterUserHandler:
def __init__(self, repo: UserRepository, bus: EventBus):
self.repo, self.bus = repo, bus
async def __call__(self, cmd: RegisterUserCommand) -> None:
if await self.repo.by_email(cmd.email):
raise ValueError("Email already registered")
user = User.register(id=cmd.user_id, email=cmd.email)
await self.repo.add(user)
# 领域事件 → 应用事件
app_events = [
Event(name="user.registered", payload={"user_id": e.user_id, "email": e.email})
for e in user.pull_events()
]
await self.bus.publish_many(app_events)
3)Infrastructure:事件总线(示例用内存)
# app/infrastructure/events/in_memory_bus.py
from collections import defaultdict
from typing import Dict, List, Awaitable, Callable
import asyncio
from app.core.events import Event, EventBus
EventHandler = Callable[[Event], Awaitable[None]]
class InMemoryEventBus(EventBus):
def __init__(self) -> None:
self._handlers: Dict[str, List[EventHandler]] = defaultdict(list)
def subscribe(self, name: str, handler: EventHandler) -> None:
self._handlers[name].append(handler)
async def publish(self, event: Event) -> None:
await asyncio.gather(*(h(event) for h in self._handlers.get(event.name, [])), return_exceptions=True)
async def publish_many(self, events: list[Event]) -> None:
for e in events:
await self.publish(e)
4)Interfaces:API 触发用例(FastAPI 只做入站适配器)
# app/interfaces/api/v1/user_endpoints.py
from fastapi import APIRouter, Depends, status
from dependency_injector.wiring import inject, Provide
from app.application.user.commands import RegisterUserCommand
from app.application.user.handlers import RegisterUserHandler
from app.containers import AppContainer
router = APIRouter(prefix="/users", tags=["users"])
@router.post("/", status_code=status.HTTP_201_CREATED)
@inject
async def register_user(
cmd: RegisterUserCommand,
handler: RegisterUserHandler = Depends(Provide[AppContainer.register_user_handler]),
):
await handler(cmd)
return {"status": "created"}
依赖注入
dependency-injector 的 DeclarativeContainer 非常适合集中装配,
也方便在测试里覆盖。使用要点:@inject + Provide[...] + container.wire()。
# app/containers.py
from dependency_injector import containers, providers
from app.infrastructure.events.in_memory_bus import InMemoryEventBus
from app.infrastructure.db.user_repository import SqlAlchemyUserRepository
from app.application.user.handlers import RegisterUserHandler
from app.infrastructure.db.session import create_session_factory
class AppContainer(containers.DeclarativeContainer):
wiring_config = containers.WiringConfiguration(
modules=["app.interfaces.api.v1.user_endpoints", "app.infrastructure.events.dispatcher"]
)
config = providers.Configuration()
db_session_factory = providers.Singleton(create_session_factory, db_url=config.db.url)
event_bus = providers.Singleton(InMemoryEventBus)
user_repository = providers.Factory(SqlAlchemyUserRepository, session_factory=db_session_factory)
register_user_handler = providers.Factory(
RegisterUserHandler, repo=user_repository, bus=event_bus
)
想把 InMemoryBus 换成 Kafka/Redis?
新建 KafkaEventBus 适配器,改 event_bus = providers.Singleton(KafkaEventBus, ...) 即可,
Domain 与 Application 无需改动。这就是 Ports/Adapters 的价值。
FastAPI 的依赖机制依然有用(而且很好用)
简单场景直接用 Depends,需要“前置/后置”的资源,可用 yield 写清理逻辑,
新版本还支持 scope="function"|"request" 控制清理时机。
当对象图复杂、需要集中装配/覆盖时,再由容器承接。 二者并不冲突:把“轻依赖”交给 FastAPI,自底向上的装配交给容器。
事件处理
# app/infrastructure/events/dispatcher.py
from dependency_injector.wiring import inject, Provide
from app.core.events import Event, EventBus
from app.containers import AppContainer
@inject
def setup_subscribers(bus: EventBus = Provide[AppContainer.event_bus]):
async def send_welcome_email(e: Event):
# 调用邮件适配器;失败可走重试/死信
print("Send welcome email to", e.payload["email"])
bus.subscribe("user.registered", send_welcome_email)
在 main.py 启动时 wire 一下并注册:
# app/main.py
from fastapi import FastAPI
from app.containers import AppContainer
from app.interfaces.api.v1 import user_endpoints
from app.infrastructure.events import dispatcher
def create_app() -> FastAPI:
c = AppContainer()
c.config.from_env()
app = FastAPI()
app.container = c # 组合根
app.include_router(user_endpoints.router)
c.wire(modules=[user_endpoints, dispatcher])
dispatcher.setup_subscribers()
return app
app = create_app()
常见落地建议
先用 InMemoryEventBus 跑通流程,再无缝替换成 Kafka/Redis(只改容器绑定)。 Hexagonal 的“延迟决定框架”思想就是这个味儿。
Domain 不要 import ORM / HTTP / MQ;Application 不要直接 new 适配器,都走端口 + 容器绑定。
测试友好:在测试里 container.user_repository.override(FakeRepo()),或直接替换事件总线实现,
做端到端用例测试。dependency-injector 文档对覆盖与异步注入都有示例。
接口快回、异步做事:用例完成后发布事件,把副作用(邮件、埋点、索引更新)交给消费者, 这比把所有副作用都塞在请求链里更稳。