plugin/ ├── plugin_core/ # SDK │ ├── pyproject.toml │ └── src/libacg_plugin_core/ │ ├── __init__.py │ ├── schemas.py │ ├── context.py │ ├── executor.py │ ├── loader.py │ └── cli.py │ ├── sample_tool/ # 工具插件 │ ├── pyproject.toml │ ├── __init__.py │ ├── handlers.py # stage 实现 │ └── stages/ │ ├── scan.yml │ ├── read.yml │ └── copy.yml │ ├── sample_workflow/ # 自包含 workflow │ ├── pyproject.toml │ ├── __init__.py │ ├── hooks.py │ ├── stages/ │ └── jobs/ │ └── sample_workflow_with_tool/ # 使用 tool 的 workflow ├── pyproject.toml ├── __init__.py ├── hooks.py └── jobs/
提供可复用的 Stage 实现,可被多个 Workflow 引用。
sample_tool/ ├── stages/ │ ├── scan.yml # Stage 定义 │ ├── read.yml │ └── copy.yml ├── handlers.py # Stage 处理器实现 └── pyproject.toml # entry_points 注册
entry_points:
[project.entry-points."libacg.stages"]
tool.scan = "libacg_sample_tool:STAGE_HANDLERS"
tool.read = "libacg_sample_tool:STAGE_HANDLERS"
tool.copy = "libacg_sample_tool:STAGE_HANDLERS"
定义 Job 编排,可以引用 Tool Plugin 的 Stage。
sample_workflow_with_tool/ ├── jobs/ │ └── sample_tool.yml # Job 定义 ├── hooks.py # 自定义钩子(可选) └── pyproject.toml
entry_points:
[project.entry-points."libacg.jobs"]
sample-tool = "libacg_sample_workflow_tool:job"
[project.entry-points."libacg.hooks"]
sample-tool.read:after = "libacg_sample_workflow_tool:read_after"
name: tool.scan
description: 扫描目录下文件
input:
path:
type: path
required: true
patterns:
type: string
default: "*"
output:
files:
type: array
count:
type: number
config:
recursive:
type: boolean
default: false
name: sample-tool
description: 使用 tool 插件的 workflow
params:
input:
type: path
required: true
cli_names: [-i, --input]
output:
type: path
default: ./output
pipeline:
- id: scan
stage: tool.scan
input:
path: "{{params.input}}"
- id: read
stage: tool.read
depends_on: [scan]
input:
file: "{{scan.files[0]}}"
hooks:
after: sample-tool.read:after
- id: copy
stage: tool.copy
depends_on: [scan]
input:
source: "{{params.input}}"
target: "{{params.output}}"
fail_strategy: stop
async def read_after(ctx: StageContext):
"""读取文件后钩子"""
content = ctx.output.get("content", "")
logger.info(f"Read {len(content)} chars")
# 安装 SDK
cd plugin/plugin_core
pip install -e .
# 安装 tool 插件
cd plugin/sample_tool
pip install -e .
# 安装 workflow 插件
cd plugin/sample_workflow_with_tool
pip install -e .
# 运行
acg-sample-tool --input /path/to/files --output /path/to/output
1. acg-sample-tool 发现 job (entry_points) 2. 加载 job.yml 3. 加载引用的 stages (tool.scan, tool.read, tool.copy) 4. 从 entry_points 发现 stage handlers 5. 构建 DAG 并执行