plugin/
├── plugin_core/ # SDK
│ ├── pyproject.toml
│ └── src/libacg_plugin_core/
│ ├── __init__.py
│ ├── schemas.py
│ ├── context.py
│ ├── executor.py
│ ├── loader.py
│ └── cli.py
│
├── sample_tool/ # 工具插件
│ ├── pyproject.toml
│ ├── __init__.py
│ ├── handlers.py # stage 实现
│ └── stages/
│ ├── scan.yml
│ ├── read.yml
│ └── copy.yml
│
├── sample_workflow/ # 自包含 workflow
│ ├── pyproject.toml
│ ├── __init__.py
│ ├── hooks.py
│ ├── stages/
│ └── jobs/
│
└── sample_workflow_with_tool/ # 使用 tool 的 workflow
├── pyproject.toml
├── __init__.py
├── hooks.py
└── jobs/
提供可复用的 Stage 实现,可被多个 Workflow 引用。
sample_tool/
├── stages/
│ ├── scan.yml # Stage 定义
│ ├── read.yml
│ └── copy.yml
├── handlers.py # Stage 处理器实现
└── pyproject.toml # entry_points 注册
entry_points:
[project.entry-points."libacg.stages"]
tool.scan = "libacg_sample_tool:STAGE_HANDLERS"
tool.read = "libacg_sample_tool:STAGE_HANDLERS"
tool.copy = "libacg_sample_tool:STAGE_HANDLERS"
定义 Job 编排,可以引用 Tool Plugin 的 Stage。
sample_workflow_with_tool/
├── jobs/
│ └── sample_tool.yml # Job 定义
├── hooks.py # 自定义钩子(可选)
└── pyproject.toml
entry_points:
[project.entry-points."libacg.jobs"]
sample-tool = "libacg_sample_workflow_tool:job"
[project.entry-points."libacg.hooks"]
sample-tool.read:after = "libacg_sample_workflow_tool:read_after"
name: tool.scan
description: 扫描目录下文件
input:
path:
type: path
required: true
patterns:
type: string
default: "*"
output:
files:
type: array
count:
type: number
config:
recursive:
type: boolean
default: false
name: sample-tool
description: 使用 tool 插件的 workflow
params:
input:
type: path
required: true
cli_names: [-i, --input]
output:
type: path
default: ./output
pipeline:
- id: scan
stage: tool.scan
input:
path: "{{params.input}}"
- id: read
stage: tool.read
depends_on: [scan]
input:
file: "{{scan.files[0]}}"
hooks:
after: sample-tool.read:after
- id: copy
stage: tool.copy
depends_on: [scan]
input:
source: "{{params.input}}"
target: "{{params.output}}"
fail_strategy: stop
async def read_after(ctx: StageContext):
"""读取文件后钩子"""
content = ctx.output.get("content", "")
logger.info(f"Read {len(content)} chars")
# 安装 SDK
cd plugin/plugin_core
pip install -e .
# 安装 tool 插件
cd plugin/sample_tool
pip install -e .
# 安装 workflow 插件
cd plugin/sample_workflow_with_tool
pip install -e .
# 运行
acg-sample-tool --input /path/to/files --output /path/to/output
1. acg-sample-tool 发现 job (entry_points)
2. 加载 job.yml
3. 加载引用的 stages (tool.scan, tool.read, tool.copy)
4. 从 entry_points 发现 stage handlers
5. 构建 DAG 并执行