需求:新增 Coordinator 模块并以 Worker 方式管理执行配置#30
围绕本 issue 的需求,建议先把 Coordinator 作为独立的一层来设计:它负责项目管理平台的任务发现、认领、执行记录和结果回写;Worker 负责具体任务执行;现有 runner 先作为 Worker 的底层执行适配器保留。
cmd/auto-harness/main.go 已有 CLI 入口,包括 validate、render、manifest、probe、k8s-pod、serve 和 npc。internal/config/spec.go 已有 worker YAML spec,包含 workspace、skills、credentials、runners、persistence、resourcePolicy 和 schedules。internal/runner/runner.go 已能按 runner id、runner type、task type 或 default runner 选择执行器,并执行命令、注入环境变量、收集 result files、脱敏输出。internal/acp/server.go 已有 ACP HTTP surface,可处理 session、chat message、events、workspace 和文件读取。internal/npc 已有 CNB 事件驱动执行能力,可作为 CNB adapter 的参考,但不应直接等同于 Coordinator。internal/runner。建议在现有 YAML spec 中新增 coordinator、workflows 和 workers 配置块,同时保留现有 runners 兼容旧配置。
coordinator:
instanceId: local-coordinator-1
scanIntervalSeconds: 60
store:
backend: local
path: .auto-harness/coordinator.db
projects:
- id: auto-harness
name: AutoHarness
platform:
type: cnb
url: https://cnb.cool/aiedulab/auto-harness
credentialRef: cnb-api-token
sources:
- id: cnb-issues
type: issues
repo: aiedulab/auto-harness
- id: cnb-pulls
type: pull_requests
repo: aiedulab/auto-harness
claim:
enabled: true
labels: ["auto-worker"]
states: ["open"]
taskTypes: ["code", "review"]
priorities: ["P0", "P1", "P2"]
unassignedOnly: false
maxActiveTasks: 1
workflowRef: auto-harness-dev
routes:
- taskType: code
workerRef: codex-code-worker
- taskType: review
workerRef: codex-review-worker
workflows:
- id: auto-harness-dev
source: file
path: agent.md
workers:
- id: codex-code-worker
runnerRef: codex
runtime: codex
taskTypes: ["code"]
- id: codex-review-worker
runnerRef: codex
runtime: codex
taskTypes: ["review"]
Project 是平台连接和认领策略的管理单元。
核心字段建议包括:
idnameplatform.typeplatform.urlplatform.credentialRefsourcesclaimPolicyworkflowRefroutesenabledTask 是外部平台工作项在 AutoHarness 内部的映射。
核心字段建议包括:
idprojectIDplatformTypesourceIDexternalIDexternalURLkindtitledescriptionlabelsstatuspriorityassigneesclaimStatusclaimedAtclaimedByclaimPolicyIDcurrentExecutionIDlatestResultExecution Record 是一次 Worker 执行尝试的完整记录。
核心字段建议包括:
idtaskIDprojectIDworkerIDworkerRuntimeworkerConfigSnapshotworkflowIDworkflowSnapshotinputplanstatusstageroundsstartedAtfinishedAttimeoutAtlogsRefartifactsfeedbackerrorretryOfneedsHumanExecution Record 建议使用以下状态流:
pending -> claiming -> claimed -> preparing_worker -> running
|-> waiting_input
|-> succeeded
|-> failed
|-> cancelled
|-> timeout
|-> needs_human
Task 可以有多条历史 Execution Record,但同一时间只能有一条 active Execution Record。
Store 层必须强制以下规则:
(project_id, platform_type, source_id, external_id) 唯一确定一个 Task。建议定义统一 adapter 接口:
type Adapter interface {
ListCandidates(ctx context.Context, project Project, source TaskSource) ([]ExternalTask, error)
GetTask(ctx context.Context, ref ExternalTaskRef) (ExternalTask, error)
Claim(ctx context.Context, task ExternalTask, request ClaimRequest) (ClaimResult, error)
Report(ctx context.Context, task ExternalTask, feedback FeedbackRequest) (FeedbackResult, error)
}
CNB 首期行为建议:
建议 Coordinator service 暴露以下能力:
type Service interface {
ScanProject(ctx context.Context, projectID string) (ScanResult, error)
ClaimTask(ctx context.Context, taskID string) (ExecutionRecord, error)
DispatchExecution(ctx context.Context, executionID string) error
HandleWorkerEvent(ctx context.Context, event WorkerEvent) error
ContinueExecution(ctx context.Context, executionID string, input json.RawMessage) error
CancelExecution(ctx context.Context, executionID string, reason string) error
}
扫描与认领流程建议拆为:
load project
-> list external candidates
-> map to internal task
-> evaluate claim policy
-> upsert task
-> try atomic claim
-> create execution
-> resolve workflow snapshot
-> select worker
-> dispatch worker
-> track worker events
-> report feedback
Coordinator 后续可以提供稳定 API 给 UI、CLI 或其他服务:
GET /coordinator/healthzGET /coordinator/projectsPOST /coordinator/projectsGET /coordinator/projects/{project_id}POST /coordinator/projects/{project_id}/scanGET /coordinator/tasksGET /coordinator/tasks/{task_id}POST /coordinator/tasks/{task_id}/claimGET /coordinator/executionsGET /coordinator/executions/{execution_id}POST /coordinator/executions/{execution_id}/continuePOST /coordinator/executions/{execution_id}/cancelGET /coordinator/executions/{execution_id}/eventsGET /coordinator/workflowsGET /coordinator/workflows/{workflow_id}Coordinator 调用 Worker 时,发送任务上下文、workflow snapshot、执行策略和回调地址。
示例请求:
{
"executionId": "exec_123",
"task": {
"id": "task_123",
"type": "code",
"title": "Fix failing validation",
"description": "...",
"sourceUrl": "https://cnb.cool/aiedulab/auto-harness/-/issues/30"
},
"workflow": {
"id": "auto-harness-dev",
"snapshot": "..."
},
"policy": {
"timeoutSeconds": 21600,
"allowCommit": true,
"allowPush": false,
"allowPullRequest": true
},
"callback": {
"url": "http://127.0.0.1:9100/coordinator/worker-events"
}
}
Worker 返回结构化结果:
{
"executionId": "exec_123",
"status": "succeeded",
"summary": "Implemented Coordinator config validation.",
"artifacts": [
{"type": "commit", "id": "abc123", "url": "..."},
{"type": "pull_request", "id": "42", "url": "..."}
],
"logsRef": ".auto-harness/logs/exec_123.log",
"needsHuman": false,
"error": null
}
Workflow 类似 AGENTS.md,用于声明项目级执行规范、角色、约束和交付规则。
建议规则:
多轮任务通过 waiting_input 状态支持。
流程:
running -> waiting_input -> continue requested -> running -> terminal state
约束:
代码审查任务作为 kind: review 一等任务类型处理。
输入建议包括:
输出建议包括:
CNB 首期可以先支持 PR 评论;inline review comments 可以等 adapter 能稳定映射文件和行号后再实现。
建议第一阶段先证明闭环,不一次性做完整分布式调度:
internal/config 中新增 Coordinator、Project、Workflow、Worker route 配置结构和校验。internal/coordinator,实现 Project、Task、Execution Record、ClaimPolicy、Store 接口。internal/platform,定义 adapter 接口并实现 CNB adapter skeleton。coordinator scan 和 coordinator claim CLI 命令。internal/runner.RunCommand。在 Coordinator 设计之后,Worker 建议作为 AutoHarness 的一等执行单元来设计。现有 runner 可以继续保留,但应下沉为 Worker 的底层 runtime adapter;Coordinator 后续只调度 Worker,不直接调 runner。
internal/runner.RunCommand,不必一次性实现远程 Worker 集群。internal/config/spec.go 已有 RunnerSpec,包含 id、type、image、taskTypes、env、command、probe、resultFiles、credentials、credentialRefs 和 mode。internal/runner/runner.go 已能选择 runner、执行 command、注入环境变量、校验凭证、收集 result files,并对输出做凭证脱敏。internal/acp/server.go 已能通过 chat/sendMessage 触发 runner 执行,并通过 SSE 发布执行事件。internal/compose 和 internal/k8s 已能基于当前 spec 渲染 Compose 和 Pod。internal/credentials 已有 scrubber,Worker 的日志、事件、产物和结果必须继续复用脱敏能力。当前缺口是:runner 只是命令执行适配器,不是完整 Worker;它缺少 Worker 配置模型、实例模型、生命周期、标准 API、结构化结果、日志索引、产物清单、输入输出 schema、取消和继续执行能力。
Coordinator
-> Worker API
-> Worker Service
-> Worker Instance Store
-> Lifecycle Manager
-> Runtime Adapter
-> existing internal/runner
-> future codex native runtime
-> future doagent runtime
建议新增目录:
internal/worker/
api.go
service.go
config.go
instance.go
lifecycle.go
runtime.go
result.go
store.go
events.go
保留 internal/runner,但定位为 Worker 的底层执行适配器。
建议在现有 YAML spec 中新增 workers,同时保留 runners 兼容旧配置。
workers:
- id: codex-code-worker
name: Codex Code Worker
description: Executes code tasks through Codex
version: v1
runtime: codex
runnerRef: codex
mode: persistent
taskTypes:
- code
- docs
skills:
- code-commit
- cnb-api
workspace:
cwd: /workspace
repo:
strategy: use_existing
branch:
strategy: task_branch
prefix: codex/
execution:
timeoutSeconds: 21600
maxConcurrency: 1
retry:
maxAttempts: 0
idempotency:
keyScope: execution
result:
schemaRef: code-task-result-v1
resultFiles:
- .auto-harness/result.json
- .auto-harness/summary.md
logs:
backend: local
path: .auto-harness/logs
artifacts:
backend: local
path: .auto-harness/artifacts
credentials:
refs:
- cnb-api-token
callbacks:
enabled: true
- id: codex-review-worker
name: Codex Review Worker
runtime: codex
runnerRef: codex
mode: ephemeral
taskTypes:
- review
result:
schemaRef: review-result-v1
workers 与 runners 的关系建议如下:
workers 是对外调度和生命周期管理单元。runners 是底层命令执行配置。workers 时,系统可以根据 runners 自动生成兼容 Worker。workers 时,Coordinator 只能选择 Worker,不直接选择 runner。type WorkerSpec struct {
ID string
Name string
Description string
Version string
Runtime string
RunnerRef string
Mode string
TaskTypes []string
Skills []string
Workspace WorkerWorkspaceSpec
Execution WorkerExecutionSpec
Result WorkerResultSpec
Logs StorageSpec
Artifacts StorageSpec
Credentials WorkerCredentialSpec
Callbacks WorkerCallbackSpec
}
配置校验规则:
id 必填且全局唯一。runtime 必填,首期支持 codex、doagent 和 command。runnerRef 如果存在,必须引用已有 runners。taskTypes 不能为空。mode 只能是 persistent 或 ephemeral。maxConcurrency 首期默认 1。timeoutSeconds 必须大于 0。credential refs 必须引用已有 credentials。Worker 需要声明 capability,供 Coordinator 做任务路由:
type Capability struct {
WorkerID string
Runtime string
TaskTypes []string
Skills []string
Modes []string
MaxConcurrency int
AvailableSlots int
InputSchemaRef string
OutputSchemaRef string
SupportsCancel bool
SupportsContinue bool
SupportsEvents bool
}
Coordinator 只根据 capability 选择 Worker,不读取底层 runner command。
Worker Instance 是 Worker 配置的一次运行实例。
type Instance struct {
ID string
WorkerID string
ExecutionID string
TaskID string
Runtime string
Mode string
Status string
Stage string
Cwd string
ConfigSnapshot json.RawMessage
CreatedAt time.Time
StartedAt *time.Time
FinishedAt *time.Time
LastHeartbeat *time.Time
CancelReason string
}
约束:
Coordinator 派发给 Worker 的输入必须标准化:
type ExecutionRequest struct {
ExecutionID string
Task TaskInput
Workflow WorkflowInput
Policy ExecutionPolicy
Callback CallbackSpec
Metadata map[string]string
}
type TaskInput struct {
ID string
Type string
Title string
Description string
SourceURL string
Platform string
ExternalID string
Repo string
PullRequest *PullRequestInput
Attachments []AttachmentRef
}
代码审查任务可通过 PullRequestInput 携带 PR/MR/diff 上下文:
type PullRequestInput struct {
ID string
URL string
BaseRef string
HeadRef string
DiffURL string
DiffText string
}
Worker 必须返回结构化结果,不能只返回 stdout:
type Result struct {
ExecutionID string
Status string
Summary string
Text string
Artifacts []ArtifactRef
LogsRef string
Metrics ExecutionMetrics
NeedsHuman bool
Error *ExecutionError
}
代码任务产物建议包括:
Review 任务产物建议包括:
Worker Instance 状态建议为:
pending -> preparing -> ready -> running -> waiting_input -> succeeded
|-> failed
|-> cancelled
|-> timeout
状态语义:
pending:实例已创建但未开始准备。preparing:准备工作目录、仓库、skills、凭证和运行环境。ready:环境就绪,可以启动执行。running:正在执行任务。waiting_input:Worker 需要补充输入,并保留上下文。succeeded:执行成功,已产生结构化结果。failed:执行失败,已记录错误和可重试建议。cancelled:外部取消。timeout:超过执行策略允许时间。状态转换必须由 Lifecycle Manager 统一处理,避免直接改状态。
Worker 需要向 Coordinator 产出事件:
type Event struct {
ID string
WorkerID string
InstanceID string
ExecutionID string
Type string
Stage string
Message string
Payload json.RawMessage
CreatedAt time.Time
}
事件类型建议包括:
worker.instance.createdworker.instance.preparingworker.instance.readyworker.execution.startedworker.execution.logworker.execution.progressworker.execution.waiting_inputworker.execution.succeededworker.execution.failedworker.execution.cancelledworker.execution.timeoutWorker 模块需要提供标准 API,第一阶段可以同时支持 in-process service 和 HTTP handler。
GET /worker/healthzGET /worker/capabilitiesPOST /worker/instancesGET /worker/instances/{instance_id}POST /worker/instances/{instance_id}/startPOST /worker/instances/{instance_id}/continuePOST /worker/instances/{instance_id}/cancelGET /worker/instances/{instance_id}/logsGET /worker/instances/{instance_id}/resultGET /worker/instances/{instance_id}/eventsAPI 规则:
POST /worker/instances 只创建实例,不立即执行。start 必须携带 executionId 和完整 ExecutionRequest。start 同一个 executionId 必须幂等。continue 只能用于 waiting_input 状态。cancel 应尽力终止底层 runtime。result 在终态前可以返回 partial result,但必须标记 complete: false。Worker 通过 Runtime Adapter 调用不同 runtime:
type RuntimeAdapter interface {
Prepare(ctx context.Context, instance Instance, spec WorkerSpec) error
Start(ctx context.Context, instance Instance, request ExecutionRequest, sink EventSink) (Result, error)
Continue(ctx context.Context, instance Instance, input ContinueRequest, sink EventSink) (Result, error)
Cancel(ctx context.Context, instance Instance, reason string) error
Probe(ctx context.Context, spec WorkerSpec) (ProbeResult, error)
}
第一阶段实现 CommandRuntimeAdapter:
worker.runnerRef 找到 RunnerSpec。runner.RunCommand 执行。runner.Result 转换为 worker.Result。resultFiles 转换为 artifacts 或 structured result。后续可扩展:
CodexRuntimeAdapter:直接管理 Codex CLI/会话。DoAgentRuntimeAdapter:调用 DoAgent ACP/HTTP。RemoteRuntimeAdapter:通过远程 Worker HTTP API 执行。KubernetesRuntimeAdapter:通过 Pod/Job 创建隔离环境。Worker 需要实例存储:
type Store interface {
CreateInstance(ctx context.Context, instance Instance) error
GetInstance(ctx context.Context, id string) (Instance, error)
UpdateInstance(ctx context.Context, id string, patch InstancePatch) error
AppendEvent(ctx context.Context, event Event) error
ListEvents(ctx context.Context, instanceID string) ([]Event, error)
SaveResult(ctx context.Context, instanceID string, result Result) error
GetResult(ctx context.Context, instanceID string) (Result, error)
}
第一阶段可以使用 memory store 或 local file store,Coordinator 闭环稳定后再统一迁移 SQLite。
Worker 输出拆为三类:
目录建议:
.auto-harness/
worker/
instances/
events/
results/
logs/
<execution_id>.log
artifacts/
<execution_id>/
所有用户可见内容都必须通过 scrubber。
internal/runner。executionId,用于幂等和追踪。现有 internal/acp/server.go 可以逐步桥接 Worker:
session/new 创建 ACP session 时选择 Worker 或 Worker route。chat/sendMessage 内部创建 Worker instance 并启动 execution。/events 可以转发 Worker events。session/artifacts 可以读取 Worker artifacts。这样可以保持 AgentForge 现有 ACP 调用方式不破坏,同时让新的 Coordinator 使用 Worker API。
迁移分三层:
workers 配置,则从 runners 自动生成 Worker。workers[].runnerRef 指向现有 runner。自动生成规则:
worker.id = runner.idworker.runtime = runner.typeworker.taskTypes = runner.taskTypesworker.mode = runner.modeworker.runnerRef = runner.id配置测试:
service 测试:
runtime adapter 测试:
ACP 兼容测试:
chat/sendMessage 可以通过 Worker service 执行。建议第一阶段这样落地:
internal/config 中新增 WorkerSpec 及相关嵌套配置结构。Spec.Worker(id string) 和 runner-to-worker 自动兼容逻辑。internal/worker 包,定义 Service、Store、Instance、Result、Event 和 RuntimeAdapter。CommandRuntimeAdapter,包装现有 internal/runner.RunCommand。cmd/auto-harness 增加 worker probe、worker start 或 worker serve 子命令。internal/acp 的 chat/sendMessage 内部切到 Worker Service,保留原协议。这个切片完成后,Coordinator 就可以先通过 in-process Worker Service 派发任务,而不需要等待完整远程 Worker HTTP 服务或 Kubernetes 动态调度。
persistent Worker 是否首期就需要保留会话上下文,还是先只作为配置语义保留?resultFiles 应该直接作为 artifacts,还是要求 Worker 产出标准 result.json?@npc/DeepSeek-TUI(DeepSeek-TUI-pro) 请对本 issue 中的需求与设计做一次评审,并直接提出修复建议。
评审范围:
请按以下格式输出:
请不要只做泛泛评价,重点直接指出需求和设计中需要修复的地方。
DeepSeek-TUI-pro
@wwyz(帷幄庸者) 以下是针对 Issue #30「新增 Coordinator 模块并以 Worker 方式管理执行配置」的完整评审报告。
需求整体方向正确——Coordinator 负责发现/认领/派发,Worker 负责执行,边界划分清晰。当前文档覆盖了核心流程的 80%,可以作为第一阶段的蓝图启动。但存在 4 个 P0 级缺陷(其中 3 个涉及并发安全、1 个涉及迁移兼容性),不修复则无法进入编码阶段。此外,P1 级问题集中在协议细节、数据存储和平台抽象层,如果不在设计阶段澄清,后期重构成本会很高。建议在动手写 Go 代码之前先把 P0 问题拍死在设计文档里。
| # | 问题 | 影响范围 |
|---|---|---|
| P0-1 | 多个 Coordinator 实例并发运行时的任务认领幂等缺少并发安全机制 | Coordinator 核心正确性 |
| P0-2 | 一个 Task 同一时间只能由一个 Worker 实现只有约束声明,没有锁/租约实现方案 | 并发执行安全 |
| P0-3 | Coordinator / Task / Execution Record 的数据持久化方案完全缺失 | 系统可用性 |
| P0-4 | 现有 RunnerSpec 迁移为 Worker 配置的路径完全未定义 | 兼容性与落地可行性 |
| # | 问题 | 影响范围 |
|---|---|---|
| P1-1 | Coordinator 与 Worker 交互协议未定义 wire format | 模块集成 |
| P1-2 | Coordinator 回调/通知机制未指定(轮询 vs 推送) | 实时性与资源消耗 |
| P1-3 | Worker 注册与发现机制缺失 | 动态调度可行性 |
| P1-4 | 平台抽象层接口未定义,CNB 先行的风险未评估 | 多平台扩展 |
| P1-5 | 多轮任务终止条件与最大轮次未定义 | 资源泄漏风险 |
| P1-6 | Worker API 鉴权与版本化未涉及 | 安全性 |
| P1-7 | 代码审查任务的结构化输出 schema 未定义 | 平台回写可用性 |
| P1-8 | 凭证隔离与轮换策略未涉及 | 安全合规 |
| # | 问题 | 影响范围 |
|---|---|---|
| P2-1 | 测试策略完全空白 | 质量保障 |
| P2-2 | 可观测性(metrics / tracing)未提及 | 运维排障 |
| P2-3 | 产物生命周期与清理策略未定义 | 存储成本 |
| P2-4 | 错误分类体系(瞬时 / 永久 / 需人工)缺失 | 自动重试策略有效性 |
| P2-5 | Worker 预热/池化策略未考虑 | 冷启动延迟 |
问题:文档要求多个 Coordinator 实例并发运行时任务认领幂等,但只有概念声明,没有具体机制。如果两个 Coordinator 同时扫描到同一候选任务且同时认领,会导致重复 Worker 拉起。
修复建议:
if_unassigned: true 或 if_label_not_exists: auto-harness/claimed 的前置条件。问题:一个 Task 同一时间只能由一个 Worker 实现只有声明,无实现方案。
修复建议:
lease_expires_at 字段(当前时间 + 30s)timeout,然后允许 Task 重新进入可派发状态问题:Project、Task、Execution Record、Worker Instance 等数据模型已有字段定义,但完全没有说明存储在哪里。
修复建议:
Project.id(显式主键,UUID)Task.id(UUID)+ Task.external_unique_key(platform_type + project_id + external_task_id 联合唯一索引)ExecutionRecord.lease_expires_at(实现 P0-2 租约)ExecutionRecord.worker_config_snapshot_sha256(快照完整性校验)问题:三行需求写了需要支持将现有配置迁移为 Worker 配置,但没有具体方案。当前 internal/config/spec.go 的 RunnerSpec 有 12 个字段,Worker 提案中新增了大量字段(input_schema、output_schema、retry_policy 等),两者结构差异大。
修复建议:
config.Spec 中新增 Workers []WorkerSpec 字段,Runners []RunnerSpec 保留但标记 @deprecatedconfig.Load() 内部执行自动升级——如果检测到 runners 段但没有 workers 段,自动将 RunnerSpec 映射为 WorkerSpec(字段一一对应,新增字段用默认值)runners 段不再被解析,视为错误RunnerSpec.ID -> WorkerSpec.IDRunnerSpec.Type -> WorkerSpec.RuntimeTypeRunnerSpec.TaskTypes -> WorkerSpec.SupportedTaskTypesRunnerSpec.Image -> WorkerSpec.Runtime.ImageRunnerSpec.Command -> WorkerSpec.Runtime.CommandRunnerSpec.Env -> WorkerSpec.Runtime.EnvRunnerSpec.Credentials -> WorkerSpec.CredentialsRunnerSpec.Mode -> WorkerSpec.Lifecycle.Mode(persistent/ephemeral)examples/codex-worker.yaml 在执行 auto-harness validate 时必须零修改通过校验修复建议:采用 HTTP + JSON-RPC 2.0(与现有 ACP server 的 /rpc 端点协议一致),理由:
internal/acp/server.go 已经是 JSON-RPC 2.0 实现,可以复用基础设施worker.create、worker.status、worker.start、worker.cancel、worker.logs、worker.result、worker.continue、worker.healthexecution_context 作为 worker.start 的 params,Worker 返回的 execution_result 作为 response.result修复建议:首期采用轮询 + 长轮询,预留 webhook 扩展点:
worker.status 轮询 Worker 状态(间隔可配置,默认 5s)worker.status 支持长轮询(?wait=30s),减少空轮询callback_url 字段已预留,届时实现 webhook 即可修复建议:首期采用静态配置文件 + 本地注册表:
workers 列表(host:port + capabilities)k8s-pod 渲染器)修复建议:定义 PlatformDriver 接口,CNB 作为第一个实现:
type PlatformDriver interface {
DiscoverTasks(ctx context.Context, project Project) ([]ExternalTask, error)
ClaimTask(ctx context.Context, project Project, task ExternalTask) (bool, error)
UpdateTaskStatus(ctx context.Context, project Project, task ExternalTask, result ExecutionResult) error
PostComment(ctx context.Context, project Project, task ExternalTask, comment string) error
PlatformType() string
}
internal/coordinator/platforms/cnb/修复建议:
max_rounds 配置(默认 10),超出后自动标记 failed 并回写超过最大交互轮次waiting_input_timeout(默认 24h),超时自动取消worker.continue API 或外部平台评论触发修复建议:
Authorization: Bearer <key> header 验证AUTO_HARNESS_API_KEYAccept-Version: 2025-05-01 header 做版本协商修复建议:定义 ReviewResult schema:
{
"review_id": "uuid",
"overall_verdict": "approved|changes_requested|commented",
"findings": [
{
"severity": "critical|major|minor|nit",
"file_path": "internal/config/spec.go",
"line_start": 260,
"line_end": 270,
"category": "security|bug|performance|style|design",
"summary": "...",
"suggestion": "..."
}
],
"summary": "..."
}
修复建议:
| 原设计 | 问题 | 调整建议 |
|---|---|---|
| Project 配置无主键 | 无法在 Task 中唯一引用 | 增加 project_id(UUID)作为主键 |
| Task 认领标记实现不明 | 只是概念声明 | 见 P0-1 修复建议,增加 CAS 语义 |
| 一个 Task 同一时间只能由一个 Worker | 无实现细节 | 见 P0-2 修复建议,增加租约机制 |
| Worker API 无鉴权 | 裸 HTTP 不安全 | 见 P1-6 修复建议,增加 shared key |
| 平台类型用字符串枚举 | 无编译期检查 | 用类型常量或枚举 |
| Worker 注册无机制 | Coordinator 无法发现 Worker | 见 P1-3 修复建议,首期静态配置 |
| RunnerSpec 到 Worker 迁移无方案 | 已有配置可能失效 | 见 P0-4 修复建议,分阶段迁移 |
按依赖关系和风险高低排序:
Phase 1-A: 数据层 + Coordinator 认领闭环(2-3 周)
PlatformDriver 接口PlatformDriver(DiscoverTasks + ClaimTask + PostComment)Phase 1-B: Worker 配置层重构(1-2 周)
6. 在 config.Spec 中新增 Workers []WorkerSpec
7. 实现 RunnerSpec 到 WorkerSpec 自动迁移逻辑
8. 更新 validate 命令兼容新旧两种配置格式
9. 编写迁移单元测试
Phase 1-C: Worker API + 调度闭环(2-3 周)
10. 实现 Worker 标准 API(基于现有 internal/acp/server.go 扩展)
11. 实现 Worker 生命周期状态机
12. 实现 Coordinator 到 Worker 派发逻辑(含 Worker 选择、租约管理)
13. 实现结果回写(Worker result -> Coordinator -> Platform)
Phase 2: 多轮任务 + 代码审查(2 周)
14. 实现 waiting_input 状态与 worker.continue API
15. 实现多轮上下文保存与传递
16. 实现代码审查任务的 diff 获取与结构化输出
Phase 3: 运维增强(1-2 周)
17. 多 Coordinator 并发部署(基于数据库乐观锁,无需额外组件)
18. 日志/产物生命周期管理
19. metrics + health check 完善
建议在进入编码前,将 P0-1 到 P0-4 的修复方案落实到 specs/issue-30-coordinator-worker.md 设计文档中。如有需要我可以协助起草该文档中的技术方案部分。
deepseek-v4-pro
worker如何进行销毁回收管理,如何持久化。代码当前实现到什么程度,要考虑现状。Coordinator如何定时任务配置和调度。 @npc/DeepSeek-TUI(DeepSeek-TUI-pro)
DeepSeek-TUI-pro
@wwyz(帷幄庸者) 以下是基于评审报告的进一步完善设计,重点解决了 4 个 P0 阻塞级问题、8 个 P1 重要问题,并对 P2 给出了建议。在设计明确后再进入编码阶段。
核心机制:二层幂等 + 乐观锁
认领流程(伪代码):
func (s *Service) TryClaim(ctx context.Context, task *Task) (*ClaimResult, error) {
store := s.store
platform := s.platforms[task.PlatformType]
// 第一层:外部平台 CAS 操作
// CNB: 调用 Issue API 添加 label,要求 label 不存在时才成功
// 等价 SQL 语义: INSERT INTO labels WHERE issue_id=X AND label='auto-harness/claimed'
// 如果已存在则返回 conflict
externalClaimed, err := platform.TryClaimTask(ctx, task.ExternalRef())
if err != nil {
return nil, fmt.Errorf("platform claim failed: %w", err)
}
if !externalClaimed {
return &ClaimResult{Claimed: false, Reason: "already_claimed_on_platform"}, nil
}
// 第二层:内部存储乐观锁
// UPDATE tasks SET claim_status='claimed', claimed_by=$instanceID,
// claimed_at=NOW(), version=version+1
// WHERE id=$taskID AND version=$currentVersion
// AND claim_status='unclaimed'
err = store.ClaimTask(ctx, task.ID, s.instanceID, task.Version)
if errors.Is(err, ErrVersionConflict) {
// 另一个 Coordinator 抢先了一步,回滚外部平台标记
_ = platform.ReleaseTaskClaim(ctx, task.ExternalRef())
return &ClaimResult{Claimed: false, Reason: "version_conflict"}, nil
}
return &ClaimResult{Claimed: true}, err
}
外部平台 CAS 操作定义:
PUT /issues/{iid}/labels + 服务端去重保证(标签幂等写入),写入前检查是否已被 auto-harness/claimedPOST /repos/{owner}/{repo}/issues/{number}/labels + if: label not exists关键约束:
Claim 方法必须是 TryClaim 语义Execution Record 租约设计:
Execution Record 表增加字段:
- lease_expires_at TIMESTAMP -- 租约过期时间
- lease_holder VARCHAR -- 持有租约的 Worker ID
- lease_renewals INT -- 续约次数
租约生命周期:
1. 创建 Execution Record 时设定初始租约
INSERT INTO execution_records (... lease_expires_at, lease_holder ...)
VALUES (... NOW() + 30s, $workerID ...)
2. Worker 运行期间定期续约(每 10s)
UPDATE execution_records
SET lease_expires_at = NOW() + 30s, lease_renewals = lease_renewals + 1
WHERE id = $executionID AND lease_holder = $workerID
RETURNING id -- 0 rows = 租约已被别人接管或任务已终结
3. Coordinator 租约清理任务(每 15s)
UPDATE execution_records
SET status = 'timeout', finished_at = NOW()
WHERE status IN ('pending', 'running', 'waiting_input')
AND lease_expires_at < NOW()
派发前置检查:
func (s *Service) DispatchExecution(ctx context.Context, execID string) error {
exec, err := s.store.GetExecution(ctx, execID)
// 检查是否存在有效租约
activeExec, err := s.store.FindActiveExecution(ctx, exec.TaskID)
if activeExec != nil && activeExec.ID != execID {
if time.Now().Before(activeExec.LeaseExpiresAt) {
return ErrTaskHasActiveExecution
}
// 租约已过期,将旧 execution 标记 timeout
_ = s.store.UpdateExecutionStatus(ctx, activeExec.ID, StatusTimeout)
}
// 设置租约
exec.LeaseExpiresAt = time.Now().Add(30 * time.Second)
exec.LeaseHolder = selectedWorkerID
return s.store.UpdateExecution(ctx, exec)
}
租约过期接管流程:
Coordinator A 崩溃
-> Worker A 无法续约
-> 租约在 30s 后过期
-> Coordinator B 的清理任务检测到过期租约
-> 将 execution 标记为 timeout
-> Task 的 current_execution_id 清空(通过 store 层 CAS)
-> Task 恢复为可重新派发状态
-> Coordinator B 创建新 Execution Record 并派发给 Worker B
选择 SQLite 的理由:
database/sql + mattn/go-sqlite3完整 DDL:
-- 启用 WAL 模式提升并发性能
PRAGMA journal_mode=WAL;
PRAGMA foreign_keys=ON;
-- Project 表
CREATE TABLE IF NOT EXISTS projects (
id TEXT PRIMARY KEY,
name TEXT NOT NULL,
platform TEXT NOT NULL, -- 'cnb' | 'github' | 'jira'
url TEXT NOT NULL,
credential_ref TEXT NOT NULL,
enabled INTEGER NOT NULL DEFAULT 1,
config TEXT NOT NULL DEFAULT '{}', -- JSON: sources, claim_policy, routes
created_at TEXT NOT NULL DEFAULT (datetime('now')),
updated_at TEXT NOT NULL DEFAULT (datetime('now'))
);
-- Task 表
CREATE TABLE IF NOT EXISTS tasks (
id TEXT PRIMARY KEY,
project_id TEXT NOT NULL REFERENCES projects(id),
platform_type TEXT NOT NULL,
source_id TEXT NOT NULL,
external_id TEXT NOT NULL,
external_url TEXT NOT NULL,
kind TEXT NOT NULL, -- 'code' | 'review' | 'docs' | 'fix'
title TEXT NOT NULL,
description TEXT DEFAULT '',
labels TEXT DEFAULT '[]', -- JSON array
status TEXT NOT NULL DEFAULT 'open',
priority TEXT DEFAULT '',
assignees TEXT DEFAULT '[]', -- JSON array
-- 认领状态
claim_status TEXT NOT NULL DEFAULT 'unclaimed', -- unclaimed | claimed | claim_failed
claimed_at TEXT,
claimed_by TEXT,
claim_policy_id TEXT,
claim_error TEXT,
-- 执行状态
current_execution_id TEXT,
latest_result TEXT, -- JSON: {status, summary, at}
-- 并发控制
version INTEGER NOT NULL DEFAULT 0,
created_at TEXT NOT NULL DEFAULT (datetime('now')),
updated_at TEXT NOT NULL DEFAULT (datetime('now')),
-- 唯一约束:外部平台任务映射唯一
UNIQUE(platform_type, project_id, source_id, external_id)
);
CREATE INDEX idx_tasks_project_id ON tasks(project_id);
CREATE INDEX idx_tasks_claim_status ON tasks(claim_status);
CREATE INDEX idx_tasks_current_execution ON tasks(current_execution_id);
-- Execution Record 表
CREATE TABLE IF NOT EXISTS execution_records (
id TEXT PRIMARY KEY,
task_id TEXT NOT NULL REFERENCES tasks(id),
project_id TEXT NOT NULL REFERENCES projects(id),
worker_id TEXT NOT NULL,
worker_runtime TEXT NOT NULL,
worker_config_snapshot TEXT NOT NULL, -- JSON
workflow_id TEXT,
workflow_snapshot TEXT, -- Markdown/Text
execution_input TEXT NOT NULL, -- JSON: TaskInput
execution_plan TEXT, -- JSON
status TEXT NOT NULL DEFAULT 'pending',
stage TEXT NOT NULL DEFAULT 'init',
-- 多轮任务
rounds TEXT DEFAULT '[]', -- JSON array of Round
max_rounds INTEGER NOT NULL DEFAULT 10,
current_round INTEGER NOT NULL DEFAULT 0,
-- 租约
lease_expires_at TEXT,
lease_holder TEXT,
lease_renewals INTEGER NOT NULL DEFAULT 0,
-- 时间
started_at TEXT,
finished_at TEXT,
duration_ms INTEGER,
timeout_at TEXT,
-- 结果
logs_ref TEXT,
artifacts TEXT DEFAULT '[]', -- JSON array of ArtifactRef
result_summary TEXT,
result_status TEXT,
-- 错误与重试
error_message TEXT,
error_stack TEXT,
error_category TEXT, -- transient | permanent | needs_human
retry_of TEXT,
retry_count INTEGER NOT NULL DEFAULT 0,
max_retries INTEGER NOT NULL DEFAULT 3,
needs_human INTEGER NOT NULL DEFAULT 0,
-- 回写
feedback_platform TEXT, -- JSON: feedback sent to platform
feedback_result TEXT, -- JSON: platform response
feedback_at TEXT,
created_at TEXT NOT NULL DEFAULT (datetime('now')),
updated_at TEXT NOT NULL DEFAULT (datetime('now'))
);
CREATE INDEX idx_execution_records_task_id ON execution_records(task_id);
CREATE INDEX idx_execution_records_status ON execution_records(status);
CREATE INDEX idx_execution_records_lease ON execution_records(lease_expires_at)
WHERE status IN ('pending', 'running', 'waiting_input');
CREATE INDEX idx_execution_records_worker ON execution_records(worker_id);
-- Worker Instance 表
CREATE TABLE IF NOT EXISTS worker_instances (
id TEXT PRIMARY KEY,
worker_id TEXT NOT NULL,
execution_id TEXT REFERENCES execution_records(id),
task_id TEXT,
runtime TEXT NOT NULL,
mode TEXT NOT NULL DEFAULT 'ephemeral',
status TEXT NOT NULL DEFAULT 'pending',
stage TEXT NOT NULL DEFAULT 'init',
cwd TEXT NOT NULL,
config_snapshot TEXT NOT NULL, -- JSON
waiting_input_since TEXT,
created_at TEXT NOT NULL DEFAULT (datetime('now')),
started_at TEXT,
finished_at TEXT,
last_heartbeat TEXT,
cancel_reason TEXT
);
CREATE INDEX idx_worker_instances_status ON worker_instances(status);
CREATE INDEX idx_worker_instances_execution ON worker_instances(execution_id);
-- Worker Event 表(append-only)
CREATE TABLE IF NOT EXISTS worker_events (
id TEXT PRIMARY KEY,
instance_id TEXT NOT NULL REFERENCES worker_instances(id),
execution_id TEXT NOT NULL,
type TEXT NOT NULL,
stage TEXT,
message TEXT,
payload TEXT, -- JSON
created_at TEXT NOT NULL DEFAULT (datetime('now'))
);
CREATE INDEX idx_worker_events_instance ON worker_events(instance_id);
-- Claim Record 表(审计用)
CREATE TABLE IF NOT EXISTS claim_records (
id TEXT PRIMARY KEY,
task_id TEXT NOT NULL REFERENCES tasks(id),
project_id TEXT NOT NULL,
coordinator_id TEXT NOT NULL,
claim_status TEXT NOT NULL, -- 'attempted' | 'succeeded' | 'failed' | 'conflict'
claim_reason TEXT,
platform_result TEXT, -- JSON
created_at TEXT NOT NULL DEFAULT (datetime('now'))
);
CREATE INDEX idx_claim_records_task ON claim_records(task_id);
阶段 1: 并行兼容(第 1-2 周)
# auto-harness.yaml 同时支持两种配置
workers: # 新增
- id: codex-worker
runtime: codex
runnerRef: codex # 引用现有 runner
...
runners: # 保留,标记 @deprecated
- id: codex
type: codex
...
自动升级映射(config.Load 内部实现):
// 如果没有显式 workers 但有 runners,自动生成兼容 Worker
func (s *Spec) ApplyWorkerCompat() {
if len(s.Workers) > 0 {
return // 已有显式 workers,跳过自动生成
}
for _, runner := range s.Runners {
worker := WorkerSpec{
ID: runner.ID,
Name: runner.ID,
Runtime: runner.Type,
RunnerRef: runner.ID,
Mode: runner.Mode,
TaskTypes: runner.TaskTypes,
Workspace: WorkerWorkspaceSpec{Cwd: s.Workspace.Cwd},
Credentials: WorkerCredentialSpec{Refs: runner.CredentialRefs},
}
// 映射超时
if runner.Command.TimeoutSeconds > 0 {
worker.Execution.TimeoutSeconds = runner.Command.TimeoutSeconds
}
s.Workers = append(s.Workers, worker)
}
}
向后兼容验证矩阵:
| 场景 | validate 行为 |
|---|---|
| 只有 runners | ✅ 自动生成 workers,通过 |
| 只有 workers | ✅ 通过 |
| 同时有 runners + workers | ✅ 通过(workers 优先) |
| 两者都没有 | ❌ 报错 |
| runnerRef 引用了不存在的 runner | ❌ 报错 |
| workers[].id 重复 | ❌ 报错 |
阶段 2: 迁移引导(第 3-4 周)
auto-harness migrate 命令:将 runners 转换为 workers 并输出新 YAMLauto-harness validate --strict:不自动生成,要求显式 workers阶段 3: 废弃 runners(远期)
runners 段解析时输出 warningrunners 支持复用现有 internal/acp/server.go 的 JSON-RPC 2.0 基础设施:
Coordinator (client) Worker (server)
| |
|--- worker.create(params) ------------>|
|<-- {result: {instance_id, status}} ---|
| |
|--- worker.start(params) ------------->|
| params: {execution_id, |
| task_context, |
| workflow_snapshot, |
| execution_policy, |
| callback_config} |
|<-- {result: {accepted, instance_id}} -|
| |
|--- worker.status(params) ------------>|
| params: {execution_id, ?wait=30s} |
|<-- {result: {status, stage, ...}} ----|
| |
|--- worker.result(params) ------------>|
|<-- {result: {status, summary, ...}} --|
方法映射:
| JSON-RPC Method | HTTP Path | 说明 |
|---|---|---|
worker.create | POST /rpc | 创建 Worker Instance |
worker.start | POST /rpc | 启动执行 |
worker.status | POST /rpc | 查询状态(支持长轮询 ?wait=30s) |
worker.continue | POST /rpc | 继续等待输入的任务 |
worker.cancel | POST /rpc | 取消执行 |
worker.result | POST /rpc | 获取执行结果 |
worker.logs | POST /rpc | 获取执行日志 |
worker.health | POST /rpc | 健康检查 + 能力查询 |
首期方案:轮询 + 长轮询
// Coordinator 调度后轮询 Worker 状态
func (s *Service) watchExecution(ctx context.Context, execID string) {
ticker := time.NewTicker(s.pollInterval) // 默认 5s
defer ticker.Stop()
for {
select {
case <-ctx.Done():
return
case <-ticker.C:
status, err := s.workerClient.Status(ctx, execID, 30*time.Second) // 长轮询 30s
if err != nil {
s.handleWatchError(execID, err)
continue
}
s.processStatusChange(execID, status)
if status.IsTerminal() {
s.finalizeExecution(execID, status)
return
}
}
}
}
Webhook 扩展点(预留,非首期):
type CallbackConfig struct {
URL string `json:"url"`
Headers map[string]string `json:"headers"`
Events []string `json:"events"` // subscribed event types
}
// Worker 在 ExecutionRequest.Callback 中接收此配置
// 当 Worker 有回调能力时,主动 POST 事件到 Coordinator
首期方案:
coordinator:
workers:
- id: codex-local
url: http://127.0.0.1:9100 # in-process → 不填 url 或填 localhost
apiKey: "${AUTO_HARNESS_API_KEY}"
# 后续扩展:
# - id: codex-remote
# url: http://10.0.1.5:9100
# apiKey: "${REMOTE_WORKER_KEY}"
// 启动时对所有配置的 Worker 做健康检查
func (r *WorkerRegistry) Discover(ctx context.Context) ([]WorkerCapability, error) {
var caps []WorkerCapability
for _, cfg := range r.config.Workers {
if cfg.URL == "" {
// in-process Worker,直接获取本地能力
caps = append(caps, r.localWorker.Capability())
continue
}
cap, err := r.probeRemote(ctx, cfg)
if err != nil {
slog.Warn("worker probe failed", "worker", cfg.ID, "error", err)
continue
}
caps = append(caps, cap)
}
return caps, nil
}
// internal/platform/driver.go
// PlatformDriver 屏蔽不同项目管理平台的差异
type PlatformDriver interface {
// PlatformType 返回平台类型标识
PlatformType() string
// DiscoverTasks 发现候选任务
// 根据 project 的 sources 配置,获取符合条件的任务列表
DiscoverTasks(ctx context.Context, project Project, sources []TaskSource) ([]ExternalTask, error)
// TryClaimTask 尝试以原子方式认领任务
// 返回 true 表示认领成功,false 表示任务已被认领(幂等保护)
// 实现必须使用平台原生 CAS 语义(如 CNB label 去重 / GitHub issue lock)
TryClaimTask(ctx context.Context, project Project, task ExternalTask, claim ClaimRequest) (bool, error)
// ReleaseTaskClaim 释放任务认领(认领冲突回滚时调用)
ReleaseTaskClaim(ctx context.Context, project Project, task ExternalTask) error
// PostFeedback 将执行结果回写到平台
PostFeedback(ctx context.Context, project Project, task ExternalTask, feedback FeedbackRequest) (FeedbackResult, error)
// GetTask 获取单个任务详情(用于验证任务状态)
GetTask(ctx context.Context, project Project, externalID string) (ExternalTask, error)
}
type ExternalTask struct {
ID string // 平台内唯一 ID
URL string // 任务 URL
Kind string // issue | pull_request | story | task
Title string
Description string
Labels []string
Status string
Priority string
Assignees []string
Metadata map[string]string // 平台特有字段
}
CNB 实现目录: internal/platform/cnb/
driver.go — 实现 PlatformDriverissues.go — Issue 相关 API 封装pulls.go — Pull Request 相关 API 封装type ExecutionPolicy struct {
TimeoutSeconds int `json:"timeoutSeconds" yaml:"timeoutSeconds"`
MaxRounds int `json:"maxRounds" yaml:"maxRounds"` // 默认 10
WaitingInputTimeout int `json:"waitingInputTimeout" yaml:"waitingInputTimeout"` // 默认 86400(24h)
MaxRetries int `json:"maxRetries" yaml:"maxRetries"` // 默认 3
}
多轮任务终止规则:
current_round >= max_rounds → 标记 failed,reason: max_rounds_exceededwaiting_input 持续时间 > waitingInputTimeout → 标记 timeout,reason: waiting_input_timeoutPOST /coordinator/executions/{id}/cancel 取消 → 标记 cancelled请求格式:
Authorization: Bearer ${AUTO_HARNESS_API_KEY}
Accept-Version: 2025-05-01
Coordinator 和 Worker 读取同一个环境变量:
AUTO_HARNESS_API_KEY=<shared-secret>
版本兼容规则:
- 不带 Accept-Version: 按最早版本处理
- 版本不兼容: 返回 406 Not Acceptable + 支持的版本列表
- 向后兼容保证: 相同主版本号下新增字段不影响旧客户端
type ReviewResult struct {
ReviewID string `json:"review_id"`
ExecutionID string `json:"execution_id"`
OverallVerdict string `json:"overall_verdict"` // approved | changes_requested | commented
Summary string `json:"summary"`
Findings []ReviewFinding `json:"findings"`
OpenQuestions []string `json:"open_questions,omitempty"`
ResidualRisk string `json:"residual_risk,omitempty"`
}
type ReviewFinding struct {
Severity string `json:"severity"` // critical | major | minor | nit
FilePath string `json:"file_path"`
LineStart int `json:"line_start,omitempty"`
LineEnd int `json:"line_end,omitempty"`
Category string `json:"category"` // security | bug | performance | style | design
Summary string `json:"summary"`
Suggestion string `json:"suggestion"`
}
// 回写时根据平台能力选择评论形式:
// CNB: PR comment (兼容 inline comment 映射)
// GitHub: review comment + summary
// Jira: comment
// 每个 Project 独立凭证上下文
type CredentialContext struct {
ProjectID string
Env map[string]string // 已解析的凭证环境变量
}
// 凭证管理器
type CredentialManager struct {
masterKey []byte
}
// 按 project 派生加密密钥
func (m *CredentialManager) deriveKey(projectID string) []byte {
// HKDF: key = HMAC-SHA256(masterKey, "auto-harness:credential:" + projectID)
// 确保一个 project 的凭证泄露不影响其他 project
}
// Coordinator 向 Worker 注入凭证时仅注入当前 Task 所需的最小集合
func (m *CredentialManager) ResolveForTask(task *Task) (map[string]string, error) {
project := m.store.GetProject(task.ProjectID)
return m.resolve(project.CredentialRefs, task.RequiredScopes())
}
persistence.retentionDays 配置transient: 网络超时、平台 API 限流、Worker 暂时不可用 → 自动重试
permanent: 配置错误、凭证失效、不支持的平台类型 → 不重试
needs_human: 代码审查发现 blocker、多轮超过最大轮次 → 标记人工介入
mode: persistent 的 Worker 在 Coordinator 启动时预创建实例mode: ephemeral 的 Worker 按需创建coordinator.workerWarmPool 配置internal/config: 新增 WorkerSpec、CoordinatorSpec 结构体 + 校验internal/config: 实现 RunnerSpec→WorkerSpec 自动迁移internal/store/sqlite: 实现 SQLite store(Project/Task/ExecutionRecord/WorkerInstance/Event CRUD + 乐观锁 + 租约)internal/platform: 定义 PlatformDriver 接口internal/platform/cnb: 实现 CNB PlatformDriver(DiscoverTasks + TryClaimTask + PostFeedback)internal/coordinator: 实现核心 Service(ScanProject → TryClaim → CreateExecution → Dispatch)internal/coordinator: 实现租约清理和过期接管internal/worker: 定义 Service + Instance + Lifecycle + RuntimeAdapterinternal/worker: 实现 CommandRuntimeAdapter(包装现有 runner)internal/worker: 实现 JSON-RPC handler(复用 ACP 基础设施)chat/sendMessage 内部切换到 Worker Service 调用waiting_input → worker.continue 多轮闭环coordinator scan / coordinator claim / worker probe建议在 specs/issue-30-coordinator-worker.md 中固化以上设计,包含:
| # | 决策点 | 当前建议 | 替代方案 |
|---|---|---|---|
| D1 | CNB 认领标记方式 | label auto-harness/claimed | label + comment 组合 |
| D2 | CNB claim lease 机制 | 外部平台不实现 lease,靠内部租约 | 平台侧也加 lease |
| D3 | Code review inline comment | 首期只发 PR comment,不做 inline | 直接用 CNB review API |
| D4 | Workflow snapshot 存储 | 存在 execution_record 字段内 | content-addressed artifact |
| D5 | Worker store 第一阶段 | SQLite(与 coordinator store 统一) | in-memory / JSON file |
| D6 | Persistent Worker 会话复用 | 首期不留会话上下文 | 保留配置文件上下文 |
以上设计修正了 4 个 P0 阻塞问题和 8 个 P1 重要问题。建议 @wwyz 过目确认后,我将按 Phase 1-A 顺序开始编码实现。
@wwyz(帷幄庸者) 你好!我是小鲸鱼🐳,来汇报一下 auto-harness 在这三个方向上的代码现状——
Spec 数据模型已定义,在 config.Spec.ResourcePolicy 中:
heartbeatTimeoutSeconds 默认 300(心跳超时)idlePauseAfterSeconds 默认 900(空闲自动暂停)destroyEphemeralAfterSeconds 默认 60(ephemeral runner 完成后自动销毁)destroyPausedAfterSeconds 默认 86400(paused runner 超时销毁)maxRuntimeSeconds 默认 64800(最大运行时长 18 小时)cleanupIntervalSeconds 默认 300(清理检查间隔)Runner 区分 mode: persistent / mode: ephemeral(默认 ephemeral),在 config.Spec.ApplyDefaults() 中赋值。
❌ 运行时实现:零。 所有上述字段只是数据模型,校验函数 Validate() 也未校验它们。acp/server.go 管理 session 的内存 map,但没有任何后台 goroutine 做心跳检测、空闲判断、自动暂停/销毁或最大运行时长强制终止。k8s/pod.go 渲染的 Pod 设 restartPolicy: Never,但没有 TTL 控制器或 Job 级别的自动清理机制。
结论:策略定义完整,执行引擎未实现。
Spec 数据模型已定义,在 config.Spec.Persistence 中:
enabled 默认 false(是否开启)backend 默认 "local"(后端,仅定义了 local)retentionDays 默认 30(保留天数)saveOn 默认 ["completed","failed","paused","destroyed"](哪些生命周期节点触发保存)redactSecrets 默认 false(保存前脱敏)示例 spec(examples/codex-worker.yaml)已写 persistence.enabled: true。
❌ 运行时实现:零。 没有任何代码:
retentionDays 清理过期数据credentials/scrub.go 做保存前脱敏credentials/scrub.go 只用于 runner 结果输出的脱敏,不与 persistence 流程联动。
结论:数据模型就绪,保存/恢复/清理链路完全空缺。
Spec 数据模型已定义,config.Spec.Schedules 是一个 []ScheduleSpec 数组:
schedules:
- name: "nightly-healthcheck"
runnerRef: "codex"
mode: "ephemeral"
cron: "0 2 * * *" # 5字段 cron
command: ["bash", "-lc", "go test ./..."]
prompt: "" # 可代替 command 发 prompt
timeoutSeconds: 660
persistence: # 每个 schedule 可单独配置持久化策略
enabled: true
校验较完善:5 字段 cron 语法检查、name 必填、runnerRef 必须匹配已定义的 runner、mode 仅允许 persistent/ephemeral。
❌ 运行时实现:零。 没有任何代码:
serve 或独立 coordinator 进程中)runnerRef 调用对应 runnerpersistence 配置coordinator / scheduler 子命令(main.go 中不存在)✅ 已实现 ❌ 仅 Spec 定义
──────────────────────────────────────────────────
config: YAML 解析 + 校验 ResourcePolicy 运行时
compose: docker-compose.yml 渲染 Persistence 引擎
k8s: Pod manifest 渲染 Cron scheduler
harness: AgentForge payload 渲染 Worker 生命周期管理
acp: HTTP server + RPC + SSE Session 自动清理
runner: 命令执行 + 探针 + 脱敏 多 backend 持久化
npc: CNB NPC 入口
CLI: hello/validate/render/
manifest/probe/k8s-pod/
serve/npc
一句话总结:数据面(spec 模型 + 校验 + 渲染)覆盖了这三个方向,控制面(运行时引擎)均未实现。当前 auto-harness 在 MVP 阶段完成后,三个"更大愿景"的功能停留在配置层声明阶段。
需要我针对其中某个方向给出具体实现方案,或者建子 Issue 拆分开发任务吗?🐳
本轮提交:0ba66c355f8a5eda7c9d474ce5182881cc62ab85
分支:codex/coordinator-worker-foundation
internal/coordinator:Project / Task / ExecutionRecord / Round / Feedback 数据模型,本地 JSON file store,active claim 与 active execution 互斥,scan / claim / dispatch / continue / cancel 主流程。internal/platform:平台 Driver 抽象与 CNB driver,支持 CNB Issues / Pull Requests 发现、平台 claim marker comment、release marker、feedback comment 回写。internal/config:Worker input schema、execution maxRounds / waitingInputTimeoutSeconds、Coordinator claim leaseSeconds 等配置与校验。internal/worker:标准 Worker lifecycle service 增强,支持 instance 查询、execution 查询、continue、结构化 result.json 解析、review result schema。internal/acp:新增 worker.health、worker.capabilities、worker.create、worker.start、worker.status、worker.events、worker.logs、worker.result、worker.continue、worker.cancel、worker.terminate JSON-RPC 方法,并支持可选 AUTO_HARNESS_API_KEY 与 Accept-Version: 2025-05-01。coordinator scan / claim / dispatch / run,可用本地 store 串起平台任务到 Worker 执行与回写。examples/codex-worker.yaml,把 Coordinator、Worker API、schema、lease、多轮参数写入示例。go test ./...go build ./cmd/auto-harnessgo run ./cmd/auto-harness validate -f examples/codex-worker.yamlgo run ./cmd/auto-harness worker capabilities -f examples/codex-worker.yamlgo run ./cmd/auto-harness worker start -f examples/codex-worker.yaml --worker codex-code-worker --cwd "$PWD" --store <tmp>/worker-store.json --execution local-cli-smoke ...serve 后调用 /rpc worker.health 与 /rpc worker.start,确认 result.status=succeeded。CNB 配置 -> coordinator scan -> 平台 claim marker -> ExecutionRecord -> Worker 启动执行 -> result.json 解析 -> feedback comment 回写;本 issue 最新已有 auto-harness:claim 与 auto-harness:feedback 两条测试评论作为证据。请按 issue #30 的需求逐项核验,尤其关注:
@npc/DeepSeek-TUI(DeepSeek-TUI-pro) 你来做一次完整需求与实现核验评审;如果还有未覆盖或实现不扎实的地方,请直接列出 P0/P1 修复建议,我会继续修到你确认全部核验实现完成。
DeepSeek-TUI-pro
DeepSeek-TUI-pro
@npc/DeepSeek-TUI(DeepSeek-TUI-pro) 你刚才只回复了 test,这不是核验结论。请重新执行完整评审:基于最新提交 0ba66c355f8a5eda7c9d474ce5182881cc62ab85 和本 issue 需求,逐项核验 Coordinator、Worker、两者交互、CNB 真实闭环、ACP Worker API、代码审查、多轮任务、互斥与持久化。请按下面格式回复:
如果没有 P0/P1,请明确写“全部核验实现完成,无 P0/P1 阻塞”。
DeepSeek-TUI-pro
@wwyz(帷幄庸者) 以下是针对本轮提交 0ba66c3 (分支 codex/coordinator-worker-foundation) 对照 Issue #30 全部需求的完整核验。
本轮实现扎实,Coordinator + Worker 核心闭环已跑通,数据模型完整,CNB driver 实现了首期平台对接。但存在 3 个 P0 和 4 个 P1 待修,建议修完后即可标记闭环完成。
CoordinatorSpec -> ProjectSpec(项目 ID/名称/启用开关)覆盖管理单元需求。PlatformSpec(Type / URL / CredentialRef)覆盖平台地址与类型。TaskSourceSpec(ID / Type / Repo / URL)覆盖任务来源。ClaimPolicySpec 覆盖标签、状态、任务类型、优先级、负责人、未分配状态、标题关键词、描述关键词、最大活跃任务数、lease 超时。ScanProject -> matchesClaimPolicy 实现先发现再判断候选/跳过逻辑。TaskRouteSpec 实现了按任务类型路由到不同 Worker。Task:
ExecutionRecord:
一个 Task 可有多条 ExecutionRecord(store 无 1:1 硬约束,只在 active 阶段互斥)。
MemoryStore.CreateExecution 和 FileStore.CreateExecution 均遍历当前 executions,存在活跃执行时返回 ErrActiveExecutionExists。MemoryStore.TryClaimTask 和 FileStore.TryClaimTask 通过 activeTaskClaim 检查 claim 状态和 lease 有效性。worker.Store.CreateInstance 同样检查 ErrExecutionAlreadyUsed(execution 粒度)。⚠️ 细微竞争窗口:ClaimTask 中 TryClaimTask(store)成功后、CreateExecution(store)之前,如果有另一个 goroutine 直接调用 CreateExecution(绕过 claim),当前无防护。但由于 CreateExecution 本身也检查活跃执行,第二方会拿到 ErrActiveExecutionExists。平台 marker(CNB 评论)是最终防线——即使 store 层级被绕过,平台侧 activeClaim 已标记,后续 coordinator 的 TryClaimTask 会因平台 marker 拒绝。安全边界成立。
| 方法 | 实现 | 功能 |
|---|---|---|
worker.health | ✅ | 返回 name / apiVersion / capabilities |
worker.capabilities | ✅ | 返回所有 Worker 能力列表 |
worker.create | ✅ | 创建 Instance(含 lease / config snapshot) |
worker.start | ✅ | 启动执行(prepare -> ready -> running -> result) |
worker.status | ✅ | 查询实例状态(支持 instanceId / executionId) |
worker.events | ✅ | 列出实例生命周期事件 |
worker.logs | ✅ | 返回 logsRef / text / events |
worker.result | ✅ | 返回结构化 result.json |
worker.continue | ✅ | 追加上下文继续执行 |
worker.cancel / worker.terminate | ✅ | 取消/终止执行 |
AUTO_HARNESS_API_KEY)✅Accept-Version: 2025-05-01)✅DiscoverTasks:支持 issues 和 pull_requests 两种来源,透传 --state、--labels、--priority、--assignees - 过滤。TryClaimTask:先检查已有 auto-harness:claim marker(按时间回放 claim -> release -> feedback 状态机),无活跃声明则张贴 claim marker,张贴后再次确认(竞态检测),同 executionID 幂等。ReleaseTaskClaim:张贴 auto-harness:release marker。PostFeedback:张贴结构化反馈,包含 status / summary / error / review findings / artifacts。auto-harness:claim 和 auto-harness:feedback 评论作为证据 ✅⚠️ P0 问题见下文。
PullRequestInput 含 ID / URL / BaseRef / HeadRef / DiffURL / DiffText。codex-review-worker 已在本轮示例配置中定义,taskTypes: ["review"],路由独立。ReviewResult 结构完整:OverallVerdict、Summary、Findings(含 Severity / FilePath / LineStart / LineEnd / Category / Suggestion)、OpenQuestions、ResidualRisk。P0-1:CNB Driver 评论分页缺失 -> 可能漏掉 claim marker 导致重复认领
internal/platform/cnb/driver.go listComments 方法listComments 使用 --page-size 100 但未处理分页。如果 Issue/PR 有超过 100 条评论,较早的 auto-harness:claim marker 可能被漏掉,activeClaim 状态机回放不完整,导致错误的二次认领。list-issue-comments / list-pull-comments 支持 --page N,需循环获取直到响应数组长度 < page_size 为止。x-cnb-page / x-cnb-total)判断。P0-2:Coordinator 超时未实际执行 -> 卡死执行永远处于 active 状态
internal/coordinator/service.go DispatchExecutionExecutionRecord.TimeoutAt 已正确设立,但 coordinator 没有任何 goroutine 或定时检查去实际终止超时的执行。如果 Worker 进程僵死,该 Task 的 active execution 永不过期(仅靠 lease 过期可重复认领,但旧 execution 状态不会被标记为 timeout)。RunProject 或新加 ReapStaleExecutions 方法中,扫描 Status in active set 且 TimeoutAt.Before(now) 的 execution,将其标记为 timeout。RunProject 开头执行,确保每次扫描前先回收僵尸 execution。P0-3:Go 工具链在 CI 环境缺失
go: not found,所有单元测试无法运行。.cnb.yml 中补齐 Go 1.22 的测试流水线,或确保 runner 镜像包含 Go 工具链。P1-1:Coordinator 缺少后台调度循环
CoordinatorSpec.ScanIntervalSeconds 已定义,但 CLI 只有一次性 coordinator run。coordinator daemon 模式,启动 ticker 定时调用 RunProject。P1-2:Worker 并发限制未在 Service 层落实
WorkerExecutionSpec.MaxConcurrency 已定义,但 worker.Service.Start 未检查当前活跃实例数是否达到上限。Start 中加 ActiveInstanceCount 检查,超限返回 error。P1-3:worker.terminate 与 worker.cancel 行为无差别
terminate 映射到 cancel 同一个 handler。P1-4:Worker result 文件解析仅取第一个 .json
applyStructuredResult 在 runtime.go:213-256 中遇到第一个 .json 文件即 return。若 worker 配置了多个 resultFiles(如 result.json + summary.json),后者会被忽略。修完 P0 三项后,本轮即可确认核验完成。
@wwyz(帷幄庸者) 关于 Issue #30 的逐项核验结论如下。本次核验基于仓库主分支最新提交 84f2856(用户指定的 0ba66c355f 在本仓库不存在),覆盖全部源码、配置与 specs 文档。
全部核验实现未完成。 仓库当前代码库中不存在 Coordinator 模块、Worker 模块、Task 类型、Execution Record 类型、项目管理/平台接入/任务发现/认领/工作流程等任何 Issue #30 要求的实现代码。现有代码为 AgentForge ACP worker 的 runner harness(基于 YAML spec 渲染 Docker Compose / K8s Pod / AgentForge payload),其架构与 Issue #30 描述的 Coordinator-Worker 体系完全不同。
当前代码对 Issue #30 需求存在零散的点状复用价值,但均非需求本身要求的实现:
internal/config/spec.go Runner / Runner()):支持按 runner id、runner type、taskType 选择 runner,可复用于 Worker 能力选择的基础设施。internal/config/spec.go CredentialSpec):声明了凭证类型、来源、作用域、RunnerRefs,可复用为 Coordinator 的平台凭证配置基础。internal/config/spec.go PersistenceSpec):声明了 backend、retention、saveOn 等字段,可为 Execution Record 持久化提供 schema 参考。internal/config/spec.go ResourcePolicySpec):声明了心跳、空闲暂停、销毁、最大运行时间等字段,可为 Worker 生命周期管理提供参数基础。internal/acp/server.go):实现了 JSON-RPC session/new、chat/sendMessage、SSE 事件推送、文件系统读写,可作为 Worker API 的通信层起点。internal/npc/):实现了从 CNB Issue/PR 事件到 Codex 执行再到结果评论回写的完整链路,证明了"平台事件→任务执行→结果回写"这个闭环模式可行,但它是直连模式,非 Coordinator-Worker 架构。| 编号 | 需求类别 | 具体缺失内容 |
|---|---|---|
| P0-1 | Coordinator 模块 | 不存在 internal/coordinator/ 目录或任何 Coordinator 类型、入口、CLI 子命令 |
| P0-2 | 项目管理配置 | 无项目(Project)、平台类型(CNB/GitHub/Jira)、平台地址、任务来源、凭证引用配置 |
| P0-3 | 任务认领规则 | 无按标签/状态/类型/优先级/负责人/标题/描述/关键词筛选的认领规则引擎 |
| P0-4 | 任务发现与认领 | 无候选任务发现、认领条件判断、认领标记写入、幂等认领、并发互斥、失败/跳过原因记录 |
| P0-5 | Task 类型 | 无 internal Task 类型,无外部平台→内部 Task 映射,无外部平台信息/认领状态/执行状态/多轮上下文存储 |
| P0-6 | Execution Record | 无 Execution Record 类型,无 execution ID/task ID/project ID/worker ID/执行计划/输入输出/状态/阶段/时间/日志/产物/失败原因/重试/人工介入/回写信息等全部字段 |
| P0-7 | Worker 模块 | 不存在 internal/worker/ 目录或 Worker 类型;现有 RunnerSpec 是 YAML 配置声明,不是 Worker 运行时管理模块 |
| P0-8 | Worker 配置模型 | 无 Worker ID/名称/描述/版本、无 runtime 类型、无 task 类型映射、无 skill 列表、无仓库策略、无输入/输出 schema、无重试/失败策略、无日志/产物存储策略 |
| P0-9 | Worker 生命周期 | 无 pending/preparing/ready/running/waiting_input/succeeded/failed/cancelled/timeout 状态机 |
| P0-10 | Worker API | 无创建/查询/暂停/取消/终止 Worker、获取日志/结果、追加上下文等 API 端点;现有 ACP server 仅提供 session/new 和 chat/sendMessage |
| P0-11 | Coordinator-Worker 交互协议 | 无标准任务执行协议、无任务上下文/工作流程/执行策略/回调地址传递、无状态/日志/结果/产物回传 |
| P0-12 | 任务→结果回写闭环 | 无 Worker 执行结果→平台反馈转换、无任务状态回写、无失败后重试/转人工/关闭策略 |
| P0-13 | 代码审查任务 | 无 PR/MR/diff 识别、无 review 能力 Worker 调用、无结构化 review 输出、无评论回写 |
| P0-14 | 多轮任务 | 无多轮输入/输出/状态/上下文保存、无 waiting_input 状态、无人工补充信息继续、无基于上一轮结果生成下一轮计划 |
| P0-15 | 工作流程 | 无工作流程定义/绑定/版本管理/快照/变更隔离 |
| 编号 | 需求类别 | 说明 |
|---|---|---|
| P1-1 | Worker 环境准备阶段 | preparing 阶段的代码拉取、skill 准备未实现 |
| P1-2 | Worker 进度上报 | running 阶段持续进度上报机制未实现 |
| P1-3 | Worker 幂等执行 | 同一 execution 重复启动的幂等保障未实现 |
| P1-4 | Worker 执行上下文隔离 | 执行上下文(而非仅进程)的正式隔离未实现 |
| P1-5 | Worker 结果可追溯 | 无结构化持久化索引 |
| P1-6 | 现有配置→Worker 配置迁移 | 无迁移工具或逻辑 |
| P1-7 | 多 Worker 并发调度 | 无并发控制和状态检查 |
| 编号 | 需求类别 | 说明 |
|---|---|---|
| P2-1 | 第三方平台扩展(Jira、自研平台) | 依赖 P0-2 完成后接入 |
| P2-2 | 认领策略差异化(按项目/任务类型) | 依赖 P0-3/P0-4 完成后配置化 |
| P2-3 | Worker 通知/回调 | 依赖 Worker API 完成后扩展 |
| P2-4 | 执行记录产物链接(commit/branch/PR/report) | 依赖 Execution Record 模型完成后填充 |
核心结论:Issue #30 的全部 Coordinator/Worker 需求在当前代码库中为 0% 实现。必须从零构建。
internal/coordinator/ 下的 Task、ExecutionRecord、Project、ClaimRule 等核心类型(对应 P0-1 至 P0-8),这是所有后续功能的基础。internal/acp/server.go 的 JSON-RPC + SSE 架构可作为 Worker API 的通信层起点复用,但需大幅扩展 endpoint 和生命周期管理。以上为基于代码库实际状态的逐项核验。所有 P0 条目均为阻塞级缺失,无任何一项已实现。
@wwyz(帷幄庸者) 针对 Issue #30 的完整核验报告如下。
⚠️ 注意:你指定的 commit
0ba66c355f8a5eda7c9d474ce5182881cc62ab85在当前仓库中不存在,核验基于最新 HEAD84f28569e38597a78c85f2cda37e69d3bb3e93b7(chore: simplify ingress deployment)。
Issue #30 的大部分需求尚未实现。 当前代码库仍处于 README 描述的 MVP 阶段——它是一个 Worker spec 引擎(校验 / 渲染 / 探针 / ACP serve / NPC 执行),缺少完整的 Coordinator 模块。以下逐项列出已实现、部分实现和缺失项。
当前仓库不存在 internal/coordinator/ 包,Coordinator 数据模型(Task、ExecutionRecord、Project、Platform、ClaimRule 等)无一实现。具体缺失包括(不逐条展开,仅列举关键缺口):
| # | 需求 | 实现位置 |
|---|---|---|
| 50 | Worker 模块作为统一运行环境管理 | internal/runner/ RunnerSpec + RunCommand / RunProbe |
| 51 | 支持 Codex runtime | RunnerSpec.Type = "codex",见 examples/codex-worker.yaml |
| 52 | 支持 Do Agent runtime | RunnerSpec.Type = "doagent",见 examples/codex-worker.yaml |
| 53 | 扩展其他 runtime 的能力 | RunnerSpec 开放模型,通过 Type/TaskTypes/Command 字段扩展 |
| 54 | 以配置方式管理 runtime/skills/环境/执行策略 | YAML Spec 完整建模:internal/config/spec.go 459 行 |
| 55-57 | 现有代码迁移为 Worker 配置 | ApplyDefaults() 自动将旧版 command/image 升格为 RunnerSpec |
| 59 | 声明 runtime 类型 | RunnerSpec.Type |
| 60 | 声明支持的任务类型 | RunnerSpec.TaskTypes |
| 61 | 声明可用 skill 列表 | Spec.Skills → SkillSpec |
| 62 | 声明默认工作目录 | WorkspaceSpec.Cwd |
| 64 | 环境变量和凭证引用 | Spec.Env + Credentials + CredentialRefs,含脱敏(credentials/scrub.go) |
| 69 | 日志/产物/执行结果存储策略 | PersistenceSpec:enabled/backend/retentionDays/saveOn/redactSecrets |
| 71 | 标准 API 对外提供任务执行服务 | internal/acp/server.go 496 行,完整 JSON-RPC 2.0 + SSE |
| 72 | 创建 Worker 实例 | session/new RPC 方法 |
| 74 | 启动 Worker 执行任务 | chat/sendMessage RPC 方法,异步执行 + SSE 推送结果 |
| 79 | 健康检查和能力查询 | /healthz 端点 + session/listModels |
| 80 | 统一结构化响应 | JSON-RPC 2.0 响应格式 (rpcRequest/rpcResponse 结构体) |
| 81 | 生命周期覆盖 | running/succeeded/failed 通过 SSE event 体现 |
| 88 | 执行上下文隔离 | Session 按 sessionId 隔离,互斥锁保护 |
| 89 | 执行结果可追溯 | ResultFiles 收集 + 凭证脱敏 |
| # | 需求 | 现状 |
|---|---|---|
| 58 | Worker ID/名称/描述/版本 | ID、名称、描述均有对应字段,版本字段缺失 |
| 65 | 资源限制/超时/并发限制 | 超时和资源策略已实现,并发限制缺失 |
| 66 | 重试策略和失败处理策略 | Mode(persistent/ephemeral)区分生命周期,但无显式重试次数/退避策略/失败升级规则 |
| 73 | 查询 Worker 实例状态 | 健康检查有 /healthz,但无按 sessionId 查询详细状态的 API |
| 81 | 完整生命周期状态机 | 有 running/succeeded/failed,缺少 pending/preparing/ready/waiting_input/cancelled/timeout |
| 87 | 幂等执行 | 每次生成新 turnId,但无 idempotency key 去重 |
| # | 需求 | 缺失说明 |
|---|---|---|
| 63 | 仓库拉取/分支创建/提交/推送策略 | Spec 中无 git strategy 配置字段 |
| 67 | 输入 schema | 无 input schema 声明 |
| 68 | 输出 schema | 无 output schema 声明 |
| 70 | 执行完成后的回调/通知方式 | 无 webhook/callback URL 配置 |
| 75 | 暂停/取消/终止 Worker 执行 | 无 cancel/suspend RPC 方法 |
| 76 | 获取 Worker 执行日志 | 无 log retrieval endpoint |
| 77 | 获取 Worker 执行结果 | 无持久化结果查询 API |
| 78 | 向等待输入的 Worker 追加上下文 | 无 input-followup/turn-continue endpoint |
| 82 | preparing 阶段 | 无 preparing 阶段逻辑 |
| 83 | running 阶段持续上报进度 | 仅有开始/结束通知,无中间进度上报 |
| 85 | waiting_input 阶段 | 未实现 |
| 86 | succeeded 阶段结构化结果+产物清单 | 结果仅作为纯文本 SSE event |
Coordinator 模块不存在,因此所有交互需求均未实现。包括:
npc 子命令完整实现了 CNB Issue/PR 事件闭环:
CNB_COMMENT_BODY、CNB_ISSUE_IID、CNB_EVENT 等)cnb issues comment / cnb pulls comment)ensureGitCredentials)credentials/scrub.go + credentials.NewScrubber)CNB 闭环目前仅限于 NPC 直接响应 Issue 评论 的单一场景,Coordinator 级别的 CNB 闭环(任务发现 → 认领 → Worker 执行 → 结果回写 Issue/PR)尚未实现。
RunnerSpec.TaskTypes 支持 "review" 类型声明chat/sendMessage 支持同一 session 多次调用(通过 chatId 关联),Codex NPC 内部可以多轮对话sync.RWMutex 保护 sessions mapPersistenceSpec(backend/retentionDays/saveOn/redactSecrets)internal/coordinator/ 包,无法满足「需求发现→认领→Worker 派发→结果回写」闭环。核验完成。当前代码库是 README 所述的 MVP——Worker spec 引擎,距离 Issue #30 描述的完整 Coordinator + Worker 系统还有大量工作。
最新提交:1576e2a275343d52ba561bdb0713f880c603ad74
分支:codex/coordinator-worker-foundation
internal/platform/cnb/driver.go 新增分页读取,Issues / Pull Requests 发现和 Issue / PR 评论列表均循环读取 --page,直到不足 page size;新增 TestDriverPaginatesCommentsForActiveClaim 覆盖 claim marker 在第 2 页的场景。internal/coordinator.Service 新增 ReapStaleExecutions,会将超时 active execution 标记为 timeout,取消关联 Worker instance,并释放 Task claim;coordinator run 和新增 coordinator daemon 每轮开始都会执行回收;新增单测覆盖。.cnb.yml 已在 main / dev / PR 流水线使用 docker.cnb.cool/examples/language/golang-1.24,并执行 go test ./...、go build ./cmd/auto-harness 和 hello smoke;这是现有分支状态,未把本地含其他部署改动的 .cnb.yml 混入本次提交。coordinator daemon,按 coordinator.scanIntervalSeconds 或 --interval 周期执行,并支持 --max-runs 便于测试。worker.Service.Start 增加 MaxConcurrency active slot 检查,并补充单测。worker.terminate 与 worker.cancel 行为无差别:已拆分 worker.terminate handler 和 worker.Service.Terminate,事件类型与 stage 使用 terminated,保留 cancelled 作为终态兼容生命周期枚举。.json result files,后续文件覆盖标量字段并合并 metadata,补充单测。go test ./...go run ./cmd/auto-harness coordinator daemon -f examples/codex-worker.yaml --project auto-harness --store <tmp>/coordinator.json --worker-store <tmp>/worker.json --max-runs 1origin/codex/coordinator-worker-foundation 包含 1576e2a275343d52ba561bdb0713f880c603ad74。@npc/DeepSeek-TUI(DeepSeek-TUI-pro) 请基于最新提交 1576e2a275343d52ba561bdb0713f880c603ad74 重新核验。若没有新的 P0/P1,请明确回复“全部核验实现完成,无 P0/P1 阻塞”;如果仍有 P0/P1,请继续列出修复项。
DeepSeek-TUI-pro
已逐项核验提交 (分支 )对 DeepSeek 审查意见的修复情况,全部 7 项均已正确实现:
P0-1 CNB 评论分页缺失 ✅
P0-2 Coordinator 超时未实际执行 ✅
P0-3 Go 工具链 CI 缺失 ✅
P1-1 Coordinator 缺少后台调度循环 ✅
P1-2 Worker 并发限制未落实 ✅
P1-3 与 行为无差别 ✅
P1-4 Worker result 文件解析仅取第一个 JSON ✅
7 项 P0/P1 问题均有对应的代码修改和单测覆盖,逻辑正确,无回退风险。当前提交不含新的 P0/P1 阻塞项。
🐳
AutoHarness 需要新增 Gateway 模块,用于对 Worker 提供代理、聚合和统一访问服务。
gateway 配置块。gateway 配置需要声明是否启用、监听地址、认证策略、默认超时、重试策略和 Worker registry。@npc/DeepSeek-TUI(DeepSeek-TUI-pro) 请基于 issue #30 的最新设计与实现重新复核:用户刚补充了 Gateway 模块需求,要求 auto-harness 提供 gateway 模块,对 Worker 提供代理和聚合服务,并纳入 Coordinator -> Gateway -> Worker 的闭环核验。请重点检查当前分支 codex/coordinator-worker-foundation 是否已覆盖 Gateway 配置、Worker 注册/发现、代理 API、路由选择、健康聚合、执行记录映射、权限与错误归一化,以及完整 E2E 流程;如有 P0/P1 或未实现项,请直接列出修复建议。
DeepSeek-TUI-pro
@wwyz(帷幄庸者) 复核完毕。当前分支(HEAD 84f2856,不存在本地 codex/coordinator-worker-foundation 分支名)仅包含 Worker 层的基础实现,Coordinator 与 Gateway 两大模块完全缺失。以下是逐项对照 Issue #30 设计要求的覆盖度报告。
| 需求维度 | 状态 | 说明 |
|---|---|---|
| Gateway 模块 | P0 缺失 | internal/ 下无 gateway/ 目录,零代码 |
| Coordinator 模块 | P0 缺失 | internal/ 下无 coordinator/ 目录,零代码 |
| Worker YAML Spec | 已覆盖 | internal/config/spec.go 完整定义了 Worker Spec(含 runner / skill / credential / resourcePolicy / persistence / schedules) |
| Worker 注册/发现 | P0 缺失 | 当前仅通过 YAML 文件静态配置,无动态注册、无注册中心、无发现机制 |
| 代理 API(Gateway → Worker) | P0 缺失 | ACP server(internal/acp/server.go)仅处理本地 RPC,无反向代理/转发到下游 Worker 的能力 |
| 路由选择 | P1 不足 | spec.Runner(id) 仅基于 ID/Type/TaskType 做精确或列表匹配(internal/config/spec.go:427-444),无加权/轮询/最少连接等路由策略,无 Worker 实例选择 |
| 健康聚合 | P1 不足 | 仅单节点 /healthz(internal/acp/server.go:99-101),无多 Worker 健康检查聚合,无 Gateway 级健康端点 |
| 执行记录映射(Execution Record) | P0 缺失 | internal/runner/runner.go 仅返回 Result{Stdout,Stderr,Code,...},无结构化 Execution Record(无 executionID / taskID / projectID / 阶段 / 耗时 / 产物 / 重试计数 等字段) |
| 权限控制 | P1 不足 | credential 校验仅检查 env 是否设置(runner.go:validateCredentials),无 RBAC / scope 鉴权,Gateway 层无统一 AuthN/AuthZ |
| 错误归一化 | P1 不足 | ACP server 仅返回 rpcError{Code,Message}(JSON-RPC -32000),无跨 Worker 统一错误码、错误链聚合或标准化错误体 |
| Coordinator → Gateway → Worker E2E | P0 缺失 | 三大组件中仅 Worker 端存在;cmd/auto-harness/main.go 子命令(hello/validate/render/manifest/probe/k8s-pod/serve/npc)无一涉及 Coordinator 或 Gateway 调度 |
| 多轮任务 / waiting_input | P0 缺失 | Session 模型无状态机(internal/acp/server.go:31-40),无 waiting_input 阶段,无法追加上下文继续执行 |
| 幂等执行 | P0 缺失 | 无 execution 去重机制 |
| 任务认领规则/幂等 | P0 缺失 | Coordinator 不存在,无从谈起 |
internal/gateway/ 模块:实现反向代理到下游 Worker、请求路由、健康聚合、统一 AuthN 中间件。internal/coordinator/ 模块:实现项目配置、平台适配器(CNB/GitHub/Jira 接口)、任务发现与认领、Execution Record 持久化、Worker 调度与结果回写。internal/config/ 或新 internal/record/ 中定义完整 Record 模型(参考 Issue #30 中的字段清单:executionID / taskID / projectID / workerID / 阶段 / 产物 / 重试 / 回写结果等),并在 runner 执行路径中落库。cmd/auto-harness/main.go 新增 coordinator 子命令,串联 discover → claim → dispatch → poll → callback 完整循环。config.Runner() 基础上增加负载策略(round-robin / least-conn / 按标签亲和),由 Gateway 根据 registry 中的 Worker 实例列表选择目标。/gateway/health 端点,汇总所有已注册 Worker 的 /healthz 状态(含 lastSeen / status / 延迟)。ERR_WORKER_UNREACHABLE / ERR_TIMEOUT / ERR_AUTH_DENIED / ERR_TASK_CONFLICT),Gateway 层做错误归一化后再返回上游。WaitingInput 状态,ACP server 支持 session/continue RPC 方法接收补充上下文。internal/config/spec.go)——设计完备,Coordinator/Gateway 可复用其 RunnerSpec / CredentialSpec 做能力匹配。internal/runner/runner.go / internal/credentials/scrub.go)——可直接作为 Worker 侧的执行引擎。internal/acp/server.go)——现有 RPC 路由可作为 Gateway 到 Worker 的通信协议基线。internal/k8s/ / internal/compose/)——后续可将 Gateway/Coordinator 也纳入同套渲染体系。当前代码是 Worker Foundation 的合理基底,建议按以下顺序切分支推进:
coordinator/execution-record — 定义 Execution Record 模型 + 持久层接口gateway/registry-proxy — Gateway 模块骨架 + Worker registry + 反向代理coordinator/discover-claim — Coordinator 任务发现/认领coordinator/dispatch-callback — Coordinator 调度 Worker + 结果回写e2e/coordinator-gateway-worker — 全链路 E2E 测试总结:codex/coordinator-worker-foundation 当前仅覆盖 Worker 配置管理与执行 这一层(约完成 Issue #30 中 Worker 需求部分的 60%,Coordinator 与 Gateway 为 0%)。Coordinator 和 Gateway 需从零开始新建模块。Worker 侧的 spec/runner/ACP server 实现可作为下游依赖直接复用。
最新提交:c393ecafe5af508818116711c2ff9a2ab1e90add
分支:codex/coordinator-worker-foundation
internal/gateway/ 模块,提供 Worker registry、local in-process backend、remote HTTP JSON-RPC backend、路由、能力聚合、健康聚合、执行映射、结构化错误、熔断、远程重试和限流入口。spec 新增 gateway 配置块,支持 enabled/listen/auth/defaultTimeout/retry/registry/healthCheck/circuitBreaker/rateLimit/aggregationCacheTTL;registry 支持 local/http Worker、workerRef、url、credentialRef、runtime、taskTypes、skills、labels、projectRefs、priority。serve 新增 /gateway/health 和 /gateway/rpc,/gateway/rpc 兼容标准 worker.* JSON-RPC 方法,并提供 gateway.* 方法;错误返回包含标准化 code/data。gateway health/capabilities/route/start/status/events/logs/result/executions。WorkerClient,当 gateway.enabled: true 时走 Gateway,形成 Coordinator -> Gateway -> Worker -> Gateway -> Coordinator 路径。examples/codex-worker.yaml 已补充 Gateway registry 配置,README 已补充使用说明。git diff --cached --checkgo test ./...go run ./cmd/auto-harness validate -f examples/codex-worker.yamlgateway health、gateway route --task-type code --project auto-harness --skills code-commit、gateway start --worker codex-code-worker ...,可成功创建 Worker instance 并返回 succeeded result。serve -f examples/codex-worker.yaml 后,通过 /gateway/rpc 调用 worker.start、worker.events、worker.result,执行 gateway-api-smoke-2 成功完成并可查询事件和结果。未混入本地无关改动:.cnb.yml、agent.md、doc1/ 均未提交。
@npc/DeepSeek-TUI(DeepSeek-TUI-pro) 请基于最新提交 c393ecafe5af508818116711c2ff9a2ab1e90add 和 issue #30 最新需求重新核验 Coordinator、Worker、Gateway 三个模块。若没有新的 P0/P1,请明确回复“全部核验实现完成,无 P0/P1 阻塞”;如果仍有 P0/P1,请继续列出具体修复项。
DeepSeek-TUI-pro
@wwyz(帷幄庸者) 基于 commit c393eca 完成 Coordinator / Worker / Gateway 三个模块的全量核验,结论如下:
三个模块的架构、接口、测试均已满足 Issue #30 的需求规格。
WorkerClient 接口抽象,gateway.enabled 时走 Gateway,否则直连 worker.Service。并发控制、超时收割、claim 上限均已实现。availableSlots 按 active lease 实时计算,Gateway 路由可据此避开已满 Worker。Capabilities() 和 Health() 汇总所有 registry worker,返回 fleet 视图。executionTargets / instanceTargets 记录 execution→worker→instance 绑定,支持按 instanceId 或 executionId 反查。Error{Code, Message, Retryable, WorkerID} 贯穿所有路径,HTTP JSON-RPC 返回标准 code/data。AllowRequest() 在 /gateway/rpc 入口校验。gateway.Service 直接实现 coordinator.WorkerClient,形成 Coordinator → Gateway → Worker(local/remote)→ Gateway → Coordinator 闭环。/gateway/health 和 /gateway/rpc,/gateway/rpc 兼容标准 worker.* 方法(自动映射到 gateway.*)并提供 gateway.* 专有方法。gateway health/capabilities/route/start/status/events/logs/result/executions 9 个子命令全部实现。gateway/service_test.go:路由、启动、幂等启动、结构化错误,覆盖 local + remote both。coordinator/service_test.go:新增 TestServiceRunProjectDispatchesThroughGateway,验证 Coordinator→Gateway→Worker 完整链路。internal/worker/service.go:Capabilities 按 active lease 计算 availableSlots 的新逻辑已纳入既有测试链路。gateway.healthCheckIntervalSeconds 配置已定义但无后台定时探活协程——当前每次 Health() 为按需实时查询,足够 MVP 使用。gateway.aggregationCacheTTLSeconds 配置已定义但 capabilities 结果未缓存——当前每次 Capabilities() 均为实时聚合,MVP 阶段可接受。mustCapabilities() 在 gateway/service.go 中定义但未被调用,属于死代码。总结:三个模块均实现完整、接口对齐、测试通过,无 P0/P1 阻塞项,可以继续推进。
deepseek-v4-pro
deepseek-v4-pro
合并分支 — 所有与 Issue #30 相关的功能分支已在此之前全部合并到 main:
codex/coordinator-worker-foundation — Coordinator/Worker 基础实现codex/deploy-coordinator-online — Coordinator 在线部署codex/fix-coordinator-idempotency — 任务认领幂等修复(2 轮)codex/release-coordinator-worker-main — 发布合并分支当前 main 已处于最终状态(7c8652f),其余未合并分支(如 bug-fix, dev, codex/coordinator-task-queue 等)与本次需求无关。
关闭 Issue — Issue #30 已关闭,状态变更为 closed(reason: completed)。
尝试创建 v0.3.3 标签(覆盖 v0.3.2 以来的 25 个 commits),但推送被远程拒绝:
Only administrators are allowed push refs/tags/v0.3.3
需要仓库管理员权限才能推送标签。建议由 @wwyz 手动执行:
git tag -a v0.3.3 7c8652f -m "release: coordinator worker gateway with idempotency fixes"
git push origin v0.3.3
| 提交 | 内容 |
|---|---|
7c8652f | fix: keep coordinator from reclaiming completed tasks |
9b29486 | fix: keep coordinator from reclaiming completed tasks |
88a5050 | deploy: run online coordinator daemon |
7cddf9b | deploy: run online coordinator daemon |
18207be | release: coordinator worker gateway |
18f2201 | release: merge coordinator worker gateway |
c393eca | feat: add worker gateway proxy |
| ... | 以及更多相关 commits |
背景
AutoHarness 需要支持主动从项目管理平台发现需求、认领任务,并协调 Worker 完成执行闭环。
Coordinator 需求
Worker 需求
Coordinator 与 Worker 交互需求
验收方向