这是一套在 FastAPI 里实践 DDD/Clean/Hexagonal 的最小可用模板,顺手支持事件驱动。 目标只有一个:把业务装进“可替换的边界”,把基础设施关在外侧

好处是,换数据库、换消息队列、换日志方案时,Domain 与 Use Case 无感。 这正是 Hexagonal/Clean 所推崇的“内外分离、依赖内向”。

为什么是这套结构?

Hexagonal(端口/适配器)让我们先定义“要什么能力”(端口),再决定“用什么实现”(适配器)。 因此,仓库、事件总线、外部网关统统是“可热插拔”的。

Clean Architecture提醒我们:框架只是细节,业务规则应该不依赖任何 Web/DB/MQ; 控制反转,依赖指向内层。

FastAPI自带“够用的依赖机制”(Depends / yield),而当项目复杂到需要集中装配和覆盖测试时, dependency-injector 负责把“端口 → 适配器”的绑定统一放进容器。

目录与分层

app/
  main.py                  # 组合根:创建 FastAPI,装配容器,挂路由
  containers.py            # DI 容器(dependency-injector)
  config.py                # 配置来源(env/file)

  core/
    events.py              # Event 接口 & EventBus 端口
    exceptions.py
    message_bus.py         # 可选:应用层消息总线封装

  domain/
    user/
      entities.py          # 聚合/实体
      value_objects.py
      events.py            # 领域事件(纯 Python 数据)
      repositories.py      # 仓库端口(接口)
      services.py          # 领域服务(纯业务规则)

  application/
    user/
      dto.py               # 输入/输出 DTO
      commands.py          # 用例请求
      queries.py
      handlers.py          # 用例实现:拿端口,编排事务与事件

  infrastructure/
    db/
      session.py           # 连接与会话工厂(SQLAlchemy 等)
      models.py            # ORM 模型(别渗入 Domain)
      user_repository.py   # 仓库适配器(端口实现)

    events/
      in_memory_bus.py     # 事件总线适配器(示例)
      dispatcher.py        # 统一注册应用事件处理器

    messaging/
      publisher.py         # 对外发布器(Kafka/Redis/NATS 等)
      subscriber.py        # 外部消息→内部事件

  interfaces/
    api/
      deps.py              # API 层通用依赖
      v1/
        router.py
        user_endpoints.py  # 只调 Use Case,不写业务

    consumers/
      worker.py            # 消费者进程,和 Web 同容器装配
  • Domain:只放业务语义;不引用框架、ORM、HTTP。
  • Application:实现用例;只依赖端口(仓库、事件总线);编排事务、发布事件。
  • Infrastructure:所有“细节实现”,可以随时替换(例如将 InMemoryBus 换成 Kafka)。
  • Interfaces:入站适配器(HTTP、CLI、Consumer)。

这正是 Hexagonal 的Port → Adapter做法;可见好处:替换实现时只改容器绑定

一条小流程

下面以 注册用户 → 发欢迎邮件 为例,跑通这条流程。

1)Domain:聚合吐出领域事件

# app/domain/user/entities.py
from dataclasses import dataclass, field
from typing import List
from .events import UserRegistered

@dataclass
class User:
    id: str
    email: str
    _events: List[object] = field(default_factory=list, init=False, repr=False)

    @staticmethod
    def register(id: str, email: str) -> "User":
        u = User(id=id, email=email)
        u._events.append(UserRegistered(user_id=id, email=email))
        return u

    def pull_events(self) -> list[object]:
        ev, self._events = self._events[:], []
        return ev

2)Application:用例实现端口编排 + 事件派发

# app/application/user/handlers.py
from app.application.user.commands import RegisterUserCommand
from app.domain.user.entities import User
from app.domain.user.repositories import UserRepository
from app.core.events import EventBus, Event

class RegisterUserHandler:
    def __init__(self, repo: UserRepository, bus: EventBus):
        self.repo, self.bus = repo, bus

    async def __call__(self, cmd: RegisterUserCommand) -> None:
        if await self.repo.by_email(cmd.email):
            raise ValueError("Email already registered")

        user = User.register(id=cmd.user_id, email=cmd.email)
        await self.repo.add(user)

        # 领域事件 → 应用事件
        app_events = [
            Event(name="user.registered", payload={"user_id": e.user_id, "email": e.email})
            for e in user.pull_events()
        ]
        await self.bus.publish_many(app_events)

3)Infrastructure:事件总线(示例用内存)

# app/infrastructure/events/in_memory_bus.py
from collections import defaultdict
from typing import Dict, List, Awaitable, Callable
import asyncio
from app.core.events import Event, EventBus

EventHandler = Callable[[Event], Awaitable[None]]

class InMemoryEventBus(EventBus):
    def __init__(self) -> None:
        self._handlers: Dict[str, List[EventHandler]] = defaultdict(list)

    def subscribe(self, name: str, handler: EventHandler) -> None:
        self._handlers[name].append(handler)

    async def publish(self, event: Event) -> None:
        await asyncio.gather(*(h(event) for h in self._handlers.get(event.name, [])), return_exceptions=True)

    async def publish_many(self, events: list[Event]) -> None:
        for e in events:
            await self.publish(e)

4)Interfaces:API 触发用例(FastAPI 只做入站适配器)

# app/interfaces/api/v1/user_endpoints.py
from fastapi import APIRouter, Depends, status
from dependency_injector.wiring import inject, Provide
from app.application.user.commands import RegisterUserCommand
from app.application.user.handlers import RegisterUserHandler
from app.containers import AppContainer

router = APIRouter(prefix="/users", tags=["users"])

@router.post("/", status_code=status.HTTP_201_CREATED)
@inject
async def register_user(
    cmd: RegisterUserCommand,
    handler: RegisterUserHandler = Depends(Provide[AppContainer.register_user_handler]),
):
    await handler(cmd)
    return {"status": "created"}

依赖注入

dependency-injectorDeclarativeContainer 非常适合集中装配, 也方便在测试里覆盖。使用要点:@inject + Provide[...] + container.wire()

# app/containers.py
from dependency_injector import containers, providers
from app.infrastructure.events.in_memory_bus import InMemoryEventBus
from app.infrastructure.db.user_repository import SqlAlchemyUserRepository
from app.application.user.handlers import RegisterUserHandler
from app.infrastructure.db.session import create_session_factory

class AppContainer(containers.DeclarativeContainer):
    wiring_config = containers.WiringConfiguration(
        modules=["app.interfaces.api.v1.user_endpoints", "app.infrastructure.events.dispatcher"]
    )

    config = providers.Configuration()

    db_session_factory = providers.Singleton(create_session_factory, db_url=config.db.url)

    event_bus = providers.Singleton(InMemoryEventBus)

    user_repository = providers.Factory(SqlAlchemyUserRepository, session_factory=db_session_factory)

    register_user_handler = providers.Factory(
        RegisterUserHandler, repo=user_repository, bus=event_bus
    )

想把 InMemoryBus 换成 Kafka/Redis?

新建 KafkaEventBus 适配器,改 event_bus = providers.Singleton(KafkaEventBus, ...) 即可, Domain 与 Application 无需改动。这就是 Ports/Adapters 的价值。

FastAPI 的依赖机制依然有用(而且很好用)

简单场景直接用 Depends,需要“前置/后置”的资源,可用 yield 写清理逻辑, 新版本还支持 scope="function"|"request" 控制清理时机。

当对象图复杂、需要集中装配/覆盖时,再由容器承接。 二者并不冲突:把“轻依赖”交给 FastAPI,自底向上的装配交给容器

事件处理

# app/infrastructure/events/dispatcher.py
from dependency_injector.wiring import inject, Provide
from app.core.events import Event, EventBus
from app.containers import AppContainer

@inject
def setup_subscribers(bus: EventBus = Provide[AppContainer.event_bus]):
    async def send_welcome_email(e: Event):
        # 调用邮件适配器;失败可走重试/死信
        print("Send welcome email to", e.payload["email"])

    bus.subscribe("user.registered", send_welcome_email)

main.py 启动时 wire 一下并注册:

# app/main.py
from fastapi import FastAPI
from app.containers import AppContainer
from app.interfaces.api.v1 import user_endpoints
from app.infrastructure.events import dispatcher

def create_app() -> FastAPI:
    c = AppContainer()
    c.config.from_env()

    app = FastAPI()
    app.container = c  # 组合根

    app.include_router(user_endpoints.router)
    c.wire(modules=[user_endpoints, dispatcher])
    dispatcher.setup_subscribers()

    return app

app = create_app()

常见落地建议

先用 InMemoryEventBus 跑通流程,再无缝替换成 Kafka/Redis(只改容器绑定)。 Hexagonal 的“延迟决定框架”思想就是这个味儿。

Domain 不要 import ORM / HTTP / MQ;Application 不要直接 new 适配器,都走端口 + 容器绑定。

测试友好:在测试里 container.user_repository.override(FakeRepo()),或直接替换事件总线实现, 做端到端用例测试。dependency-injector 文档对覆盖与异步注入都有示例。

接口快回、异步做事:用例完成后发布事件,把副作用(邮件、埋点、索引更新)交给消费者, 这比把所有副作用都塞在请求链里更稳。

延伸阅读