多智能体编排运行时,基于 DDD(Domain-Driven Design)架构从 video-agent 完全重构。
┌─────────────────────────────────────────────────┐ │ Interface Layer │ │ API │ CLI │ Event Consumers │ ├─────────────────────────────────────────────────┤ │ Application Layer │ │ Application Services │ DTOs │ ├─────────────────────────────────────────────────┤ │ Domain Layer │ │ Models (Entities/Aggregates) │ │ Value Objects │ Domain Events │ │ Domain Services │ Repository Interfaces │ ├─────────────────────────────────────────────────┤ │ Infrastructure Layer │ │ Persistence │ Messaging │ External Adapters│ └─────────────────────────────────────────────────┘
依赖方向:Interface → Application → Domain ← Infrastructure
纯业务逻辑,零外部依赖(不依赖数据库、框架、网络)。可以被任何上层引用,但自身不 import 其他层。
| 目录 | 放什么 | 示例 |
|---|---|---|
models/ | 实体(Entity)和聚合根(Aggregate Root),包含业务行为方法 | Task 实体含 start() / complete() / fail() 等状态转换方法 |
value_objects/ | 不可变值对象,用于表达业务概念 | TaskId、RunStatus、TokenBudget |
events/ | 领域事件,表示"已发生的业务事实" | TaskStarted、TaskCompleted、RunFailed |
services/ | 领域服务,涉及多个实体的业务规则 | 路由决策逻辑(输入一个 task,决定用哪个 skill 处理) |
repositories/ | 仓储接口(Port),只定义 Protocol/ABC,不含实现 | TaskRepository(Protocol) 定义 save() / find_by_id() |
# domain/models/task.py
@dataclass
class Task:
id: TaskId
status: RunStatus
def start(self) -> TaskStarted:
self.status = RunStatus.RUNNING
return TaskStarted(task_id=self.id)
# domain/repositories/task_repository.py
class TaskRepository(Protocol):
async def save(self, task: Task) -> None: ...
async def find_by_id(self, task_id: TaskId) -> Task | None: ...
用例编排层,协调 Domain 对象和 Infrastructure 完成一个完整用例。不含业务规则,只做流程串联。
| 目录 | 放什么 | 示例 |
|---|---|---|
services/ | 应用服务,一个方法 = 一个用例 | StartTaskService.execute(cmd) 加载 task → 调用 task.start() → 持久化 → 发布事件 |
dto/ | 数据传输对象,跨层数据转换 | StartTaskRequest、TaskStatusResponse |
# application/services/start_task_service.py
class StartTaskService:
def __init__(self, task_repo: TaskRepository, event_bus: EventBus) -> None:
self._task_repo = task_repo
self._event_bus = event_bus
async def execute(self, request: StartTaskRequest) -> TaskStatusResponse:
task = await self._task_repo.find_by_id(request.task_id)
event = task.start()
await self._task_repo.save(task)
await self._event_bus.publish(event)
return TaskStatusResponse(task_id=task.id, status=task.status)
Domain 层接口(Port)的具体实现(Adapter),以及所有外部系统集成。
| 目录 | 放什么 | 示例 |
|---|---|---|
persistence/ | 仓储实现、ORM 模型、数据库连接 | SqlTaskRepository 实现 TaskRepository 接口,内部用 SQLAlchemy |
messaging/ | 消息队列的生产者/消费者实现 | RedisEventBus 实现 EventBus 接口,通过 Redis Streams 发布事件 |
external/ | 外部服务适配器 | LiteLLMClient 封装 LLM 调用、LangfuseTracer 封装可观测性上报 |
# infrastructure/persistence/sql_task_repository.py
class SqlTaskRepository(TaskRepository):
def __init__(self, session: AsyncSession) -> None:
self._session = session
async def save(self, task: Task) -> None:
record = TaskRecord.from_domain(task)
self._session.add(record)
await self._session.flush()
async def find_by_id(self, task_id: TaskId) -> Task | None:
record = await self._session.get(TaskRecord, str(task_id))
return record.to_domain() if record else None
系统边界,负责接收外部请求并转发给 Application 层。不含业务逻辑,只做协议转换和输入校验。
| 目录 | 放什么 | 示例 |
|---|---|---|
api/ | HTTP 路由、请求/响应模型、中间件 | FastAPI router 接收 POST 请求 → 构造 StartTaskRequest → 调用 StartTaskService |
cli/ | 命令行入口 | python -m interface.cli.run_task --task-id xxx |
consumers/ | 消息消费者,监听队列触发用例 | Redis Stream consumer 收到消息 → 解析 → 调用 Application Service |
# interface/api/task_routes.py
router = APIRouter(prefix="/tasks")
@router.post("/{task_id}/start")
async def start_task(
task_id: str,
service: StartTaskService = Depends(get_start_task_service),
) -> TaskStatusResponse:
return await service.execute(StartTaskRequest(task_id=TaskId(task_id)))
agent-harness/ ├── domain/ # 领域层 │ ├── models/ # 实体、聚合根 │ ├── value_objects/ # 值对象 │ ├── events/ # 领域事件 │ ├── services/ # 领域服务 │ └── repositories/ # 仓储接口(Port) ├── application/ # 应用层 │ ├── services/ # 应用服务(用例编排) │ └── dto/ # 数据传输对象 ├── infrastructure/ # 基础设施层 │ ├── persistence/ # 数据库实现(Adapter) │ ├── messaging/ # 消息队列 │ └── external/ # 外部服务适配器 ├── interface/ # 接口层 │ ├── api/ # HTTP API │ ├── cli/ # CLI 入口 │ └── consumers/ # 事件消费者 ├── tests/ │ ├── unit/ # 单元测试 │ ├── integration/ # 集成测试 │ └── e2e/ # 端到端测试 ├── pyproject.toml └── README.md
uv sync # 安装依赖
uv run pytest # 运行测试
uv run pytest -m "not integration" # 跳过集成测试
uv run ruff check --fix . # Lint
uv run ty check . # 类型检查