logo
0
0
WeChat Login

Agent Harness

多智能体编排运行时,基于 DDD(Domain-Driven Design)架构从 video-agent 完全重构。

架构概览

┌─────────────────────────────────────────────────┐ │ Interface Layer │ │ API │ CLI │ Event Consumers │ ├─────────────────────────────────────────────────┤ │ Application Layer │ │ Application Services │ DTOs │ ├─────────────────────────────────────────────────┤ │ Domain Layer │ │ Models (Entities/Aggregates) │ │ Value Objects │ Domain Events │ │ Domain Services │ Repository Interfaces │ ├─────────────────────────────────────────────────┤ │ Infrastructure Layer │ │ Persistence │ Messaging │ External Adapters│ └─────────────────────────────────────────────────┘

依赖方向:Interface → Application → Domain ← Infrastructure

DDD 分层规范

Domain 层(domain/

纯业务逻辑,零外部依赖(不依赖数据库、框架、网络)。可以被任何上层引用,但自身不 import 其他层。

目录放什么示例
models/实体(Entity)和聚合根(Aggregate Root),包含业务行为方法Task 实体含 start() / complete() / fail() 等状态转换方法
value_objects/不可变值对象,用于表达业务概念TaskIdRunStatusTokenBudget
events/领域事件,表示"已发生的业务事实"TaskStartedTaskCompletedRunFailed
services/领域服务,涉及多个实体的业务规则路由决策逻辑(输入一个 task,决定用哪个 skill 处理)
repositories/仓储接口(Port),只定义 Protocol/ABC,不含实现TaskRepository(Protocol) 定义 save() / find_by_id()
# domain/models/task.py @dataclass class Task: id: TaskId status: RunStatus def start(self) -> TaskStarted: self.status = RunStatus.RUNNING return TaskStarted(task_id=self.id) # domain/repositories/task_repository.py class TaskRepository(Protocol): async def save(self, task: Task) -> None: ... async def find_by_id(self, task_id: TaskId) -> Task | None: ...

Application 层(application/

用例编排层,协调 Domain 对象和 Infrastructure 完成一个完整用例。不含业务规则,只做流程串联。

目录放什么示例
services/应用服务,一个方法 = 一个用例StartTaskService.execute(cmd) 加载 task → 调用 task.start() → 持久化 → 发布事件
dto/数据传输对象,跨层数据转换StartTaskRequestTaskStatusResponse
# application/services/start_task_service.py class StartTaskService: def __init__(self, task_repo: TaskRepository, event_bus: EventBus) -> None: self._task_repo = task_repo self._event_bus = event_bus async def execute(self, request: StartTaskRequest) -> TaskStatusResponse: task = await self._task_repo.find_by_id(request.task_id) event = task.start() await self._task_repo.save(task) await self._event_bus.publish(event) return TaskStatusResponse(task_id=task.id, status=task.status)

Infrastructure 层(infrastructure/

Domain 层接口(Port)的具体实现(Adapter),以及所有外部系统集成。

目录放什么示例
persistence/仓储实现、ORM 模型、数据库连接SqlTaskRepository 实现 TaskRepository 接口,内部用 SQLAlchemy
messaging/消息队列的生产者/消费者实现RedisEventBus 实现 EventBus 接口,通过 Redis Streams 发布事件
external/外部服务适配器LiteLLMClient 封装 LLM 调用、LangfuseTracer 封装可观测性上报
# infrastructure/persistence/sql_task_repository.py class SqlTaskRepository(TaskRepository): def __init__(self, session: AsyncSession) -> None: self._session = session async def save(self, task: Task) -> None: record = TaskRecord.from_domain(task) self._session.add(record) await self._session.flush() async def find_by_id(self, task_id: TaskId) -> Task | None: record = await self._session.get(TaskRecord, str(task_id)) return record.to_domain() if record else None

Interface 层(interface/

系统边界,负责接收外部请求并转发给 Application 层。不含业务逻辑,只做协议转换和输入校验。

目录放什么示例
api/HTTP 路由、请求/响应模型、中间件FastAPI router 接收 POST 请求 → 构造 StartTaskRequest → 调用 StartTaskService
cli/命令行入口python -m interface.cli.run_task --task-id xxx
consumers/消息消费者,监听队列触发用例Redis Stream consumer 收到消息 → 解析 → 调用 Application Service
# interface/api/task_routes.py router = APIRouter(prefix="/tasks") @router.post("/{task_id}/start") async def start_task( task_id: str, service: StartTaskService = Depends(get_start_task_service), ) -> TaskStatusResponse: return await service.execute(StartTaskRequest(task_id=TaskId(task_id)))

项目结构

agent-harness/ ├── domain/ # 领域层 │ ├── models/ # 实体、聚合根 │ ├── value_objects/ # 值对象 │ ├── events/ # 领域事件 │ ├── services/ # 领域服务 │ └── repositories/ # 仓储接口(Port) ├── application/ # 应用层 │ ├── services/ # 应用服务(用例编排) │ └── dto/ # 数据传输对象 ├── infrastructure/ # 基础设施层 │ ├── persistence/ # 数据库实现(Adapter) │ ├── messaging/ # 消息队列 │ └── external/ # 外部服务适配器 ├── interface/ # 接口层 │ ├── api/ # HTTP API │ ├── cli/ # CLI 入口 │ └── consumers/ # 事件消费者 ├── tests/ │ ├── unit/ # 单元测试 │ ├── integration/ # 集成测试 │ └── e2e/ # 端到端测试 ├── pyproject.toml └── README.md

开发指南

环境要求

  • Python >= 3.12
  • uv 包管理器

快速开始

uv sync # 安装依赖 uv run pytest # 运行测试 uv run pytest -m "not integration" # 跳过集成测试 uv run ruff check --fix . # Lint uv run ty check . # 类型检查

About

No description, topics, or website provided.
Language
Python100%