本目录提供 cnb.cool/svn/stream 的渐进式示例,帮助开发者从零快速上手到生产部署。所有示例均遵循 Go 语言最佳实践,展示真实生产场景的代码实现。
| 序号 | 示例 | 说明 | 难度 |
|---|---|---|---|
| 01 | quickstart | 最小控制面链路 | 入门 |
| 02 | preset_override | Preset 与 Override 组合 | 入门 |
| 03 | governance_checkpoint | 治理能力(鉴权、审计、恢复) | 进阶 |
| 04 | fusion_fanout | Fusion FanOut 路由 | 进阶 |
| 05 | rtsp_to_rtmp_template | 生产模板 | 进阶 |
| 06 | persistent_checkpoint | 文件持久化 checkpoint | 进阶 |
| 07 | real_gst_pipeline_template | 真实 GStreamer 模板 | 进阶 |
| 08 | multi_session_scheduler | 多会话调度 | 进阶 |
| 09 | alerting_exporter | 事件与指标联动告警 | 高级 |
| 10 | blue_green_rebuild | 蓝绿重建/回滚演练 | 高级 |
| 11 | disaster_recovery_drill | 容灾演练 | 高级 |
| 12 | cloud_uplink_profiles | 上云预设矩阵 | 高级 |
| 13 | hardware_diagnostics | 硬件诊断 | 工具 |
| 14 | news_pip_composite | 新闻画中画 | 应用 |
| 18 | abr | 自适应码率 | 特性 |
| 19 | time_sync | 多路时间戳对齐 | 特性 |
| 20 | sei | SEI 消息注入 | 特性 |
| 21 | metadata_branding | VLC 品牌元数据 | 特性 |
| 22 | timeseries_aggregation | 时间序列聚合统计 | 特性 |
| 23 | seamless_switch | 无缝画面切换 | 应用 |
所有示例支持两种运行模式:
| 模式 | 环境变量 | 说明 |
|---|---|---|
| Mock | EXAMPLE_MODE=mock | 离线模式,使用模拟数据 |
| Real | EXAMPLE_MODE=real | 真实模式,连接实际设备 |
# 离线 smoke
EXAMPLE_MODE=mock go run ./examples/01_quickstart
# 真实链路
EXAMPLE_MODE=real \
INPUT_RTSP_URI="rtsp://camera/live" \
go run -tags gogst ./examples/01_quickstart
学习目标:掌握最基本的会话创建与生命周期管理
运行示例:
# Mock 模式
EXAMPLE_MODE=mock go run ./examples/01_quickstart
# Real 模式
EXAMPLE_MODE=real \
INPUT_RTSP_URI="rtsp://camera/live" \
go run -tags gogst ./examples/01_quickstart
核心代码:
// 创建 GStreamer 引擎
engine := stream.NewGoGSTEngine()
// 创建管理器(需要 Engine、Source、Sink 中的至少一个)
manager := stream.NewManager(engine, nil, nil)
defer manager.Close()
// 创建一键调用 API
api := stream.NewOneCallAPI(manager, stream.OneCallConfig{})
// 构建请求
req := stream.OneCallRequest{
ID: "demo-01",
URI: "rtsp://camera/live",
SourceType: stream.SourceTypeRTSP,
}
// 启动会话
result := api.StartSession(req)
if result.Error != nil {
panic(fmt.Sprintf("启动失败: %v", result.Error))
}
// 等待观察
time.Sleep(5 * time.Second)
// 停止会话
api.StopSession("demo-01")
关键概念:
Manager:会话生命周期管理器Engine:媒体处理引擎(GStreamer 实现)OneCallAPI:一键式 API,自动完成探测→选配→合并→创建Session:单个流的处理会话学习目标:理解预设模板与显式覆盖的优先级
运行示例:
EXAMPLE_MODE=mock go run ./examples/02_preset_override
配置优先级:系统默认 < 预设配置 < 自动推荐 < 用户显式覆盖
核心代码:
result := api.StartSession(stream.OneCallRequest{
ID: "preset-01",
URI: "rtsp://camera/live",
SourceType: stream.SourceTypeRTSP,
Preset: stream.PresetStorageArchiveH265Compact,
Override: stream.ProfileOverride{
Transcode: &stream.TranscodeSpec{
VideoCodec: "h265",
Bitrate: 1000000, // 显式覆盖码率
},
},
})
内置预设:
storage.archive.copy_prefer — 存储优先复制storage.archive.h265_compact — H.265 紧凑存储push.rtmp.low_latency — RTMP 低延迟推流push.srt.robust — SRT 稳健推流push.cloud.h265_uplink_economy — 上云经济型push.cloud.h265_uplink_balanced — 上云均衡型transcode.standard.h264_aac — 标准 H.264/AAC 转码inspection.ai_realtime — AI 实时检测学习目标:掌握鉴权、审计、检查点恢复
运行示例:
EXAMPLE_MODE=mock go run ./examples/03_governance_checkpoint
核心能力:
核心代码:
// 设置鉴权器
manager.SetAuthorizer(stream.AuthorizerFunc(func(ctx context.Context, op string, spec stream.SessionSpec) error {
if !strings.HasPrefix(spec.ID, "allowed-") {
return fmt.Errorf("会话 ID 必须以 'allowed-' 开头")
}
return nil
}))
// 设置审计器
manager.SetAuditor(stream.AuditorFunc(func(ctx context.Context, op string, spec stream.SessionSpec, err error) {
log.Printf("[AUDIT] op=%s id=%s err=%v", op, spec.ID, err)
}))
// 设置检查点存储
manager.SetCheckpointStore(stream.NewMemoryCheckpointStore())
// 启动会话
session, err := manager.StartSession(ctx, spec)
// 进程重启后恢复
recovered, err := manager.RecoverSessions(ctx)
学习目标:理解单输入多输出的路由模式
运行示例:
EXAMPLE_MODE=mock go run ./examples/04_fusion_fanout
应用场景:一路摄像头同时推送到多个服务(RTMP、录制、AI 分析)
核心代码:
spec := stream.SessionSpec{
ID: "fusion-demo",
Pipeline: stream.PipelineSpec{
Pull: stream.SourceSpec{
ID: "camera",
Type: stream.SourceTypeRTSP,
URI: "rtsp://camera/live",
},
},
Fusion: stream.FusionSpec{
Enabled: true,
FanOut: []stream.FusionFanOutTarget{
{ID: "rtmp", URI: "rtmp://server/live/stream"},
{ID: "file", URI: "file:///tmp/record.mp4"},
},
},
}
关键特性:
学习目标:掌握生产级 RTSP 到 RTMP 转推
运行示例:
# 预览模式(默认)
go run ./examples/05_rtsp_to_rtmp_template
# 真实模式
REAL_MODE=true \
INPUT_RTSP_URI="rtsp://camera/live" \
OUTPUT_URI="rtmp://media/live/stream" \
go run -tags gogst ./examples/05_rtsp_to_rtmp_template
关键参数:
| 参数 | 默认值 | 说明 |
|---|---|---|
OUTPUT_WIDTH | 1920 | 输出宽度 |
OUTPUT_HEIGHT | 1080 | 输出高度 |
OUTPUT_FPS | 25 | 输出帧率 |
OUTPUT_BITRATE_KBPS | 2500 | 目标码率 |
RUN_FOR | 12s | 运行时长 |
核心代码:
req := stream.VideoUnitRequest{
EncodeType: stream.VideoUnitEncodeH264,
OutputType: stream.VideoUnitOutputRTMP,
OutputURI: "rtmp://media/live/stream",
RTSPURL: "rtsp://camera/live",
TaskID: "rtmp-push-01",
Width: 1920,
Height: 1080,
FPS: 25,
LatencyMS: 200,
X26xExtra: stream.VideoUnitX26xExtra{
BPS: 2500 * 1000,
GOP: 50,
QPInit: 26,
QPMin: 26,
QPMax: 50,
RCMode: 0, // CBR
},
}
学习目标:掌握文件持久化与进程恢复
运行示例:
EXAMPLE_MODE=mock go run ./examples/06_persistent_checkpoint
检查点文件:/tmp/stream_checkpoints_example.json
核心代码:
// 创建文件检查点存储
store := stream.NewFileCheckpointStore("/tmp/stream_checkpoints_example.json")
manager.SetCheckpointStore(store)
// 启动会话
session, err := manager.StartSession(ctx, spec)
// 模拟进程重启
manager.Close()
// 创建新管理器并恢复
newManager := stream.NewManager(engine, nil, nil)
newManager.SetCheckpointStore(store)
recovered, err := newManager.RecoverSessions(ctx)
学习目标:理解真实 GStreamer 管道的构建与使用
环境检查:
bash ./examples/07_real_gst_pipeline_template/check_gst_env.sh
运行示例:
GST_ROOT="/workspace/app/dynamic/1.28.1" \
PKG_CONFIG_PATH="$GST_ROOT/lib/pkgconfig" \
CGO_CFLAGS="-I$GST_ROOT/include" \
CGO_LDFLAGS="-L$GST_ROOT/lib" \
LD_LIBRARY_PATH="$GST_ROOT/lib:$LD_LIBRARY_PATH" \
REAL_MODE=true \
INPUT_RTSP_URI="rtsp://camera/live" \
OUTPUT_URI="rtmp://media/live/stream" \
go run -tags gogst ./examples/07_real_gst_pipeline_template
核心代码:
// 使用 VideoUnit facade 封装 GStreamer 管道
result := stream.ValidateVideoUnitRequest(ctx, req)
if !result.Accepted {
log.Printf("配置验证失败: %s, 建议: %s", result.ErrorCode, result.NextAction)
return
}
unit, err := stream.NewVideoUnit(ctx, req, engine, nil)
if err != nil {
log.Printf("创建 VideoUnit 失败: %v", err)
return
}
// 启动处理
if err := unit.Start(); err != nil {
log.Printf("启动失败: %v", err)
return
}
学习目标:掌握批量会话启动与调度控制
运行示例:
# Mock 模式
EXAMPLE_MODE=mock go run ./examples/08_multi_session_scheduler
# Real 模式
EXAMPLE_MODE=real \
SCHED_INPUT_RTSP_TEMPLATE="rtsp://camera-%02d/live" \
SCHED_OUTPUT_URI_TEMPLATE="rtmp://media/live/scheduler-%02d" \
go run -tags gogst ./examples/08_multi_session_scheduler
调度参数:
| 参数 | 默认值 | 说明 |
|---|---|---|
SCHED_SESSION_COUNT | 6 | 会话数量 |
SCHED_MAX_CONCURRENT_STARTS | 2 | 并发启动上限 |
SCHED_START_THROTTLE | 150ms | 启动节流间隔 |
SCHED_START_RETRY_MAX | 4 | 最大重试次数 |
核心代码:
scheduler := stream.NewSessionStartScheduler(manager, stream.SessionStartSchedulerConfig{
MaxConcurrentStarts: 2,
StartThrottle: 150 * time.Millisecond,
RetryMax: 4,
})
for i := 0; i < sessionCount; i++ {
id := fmt.Sprintf("sched-%03d", i)
req := stream.OneCallRequest{
ID: id,
URI: fmt.Sprintf(inputTemplate, i),
SourceType: stream.SourceTypeRTSP,
}
scheduler.Schedule(ctx, id, req)
}
// 等待所有会话启动完成
results := scheduler.Wait(ctx)
学习目标:掌握事件与指标联动告警
运行示例:
EXAMPLE_MODE=real \
INPUT_RTSP_URI="rtsp://camera/live" \
OUTPUT_URI="rtmp://media/live/stream" \
go run -tags gogst ./examples/09_alerting_exporter
告警参数:
| 参数 | 默认值 | 说明 |
|---|---|---|
ALERT_RECONNECT_THRESHOLD | 2 | 重连告警阈值 |
ALERT_ERROR_EVENT_THRESHOLD | 3 | 错误告警阈值 |
ALERT_CRITICAL_EVENT_THRESHOLD | 1 | 严重告警阈值 |
核心代码:
// 订阅事件
events.Subscribe("alerter", stream.EventHandlerFunc(func(e stream.Event) {
switch e.Type {
case stream.EventReconnect:
reconnectCount.Add(1)
if reconnectCount.Load() >= alertReconnectThreshold {
triggerAlert("重连次数过多")
}
case stream.EventError:
if e.Severity == stream.SeverityCritical {
triggerAlert("严重错误")
}
case stream.EventHealthScore:
if score, ok := e.Data["score"].(int); ok && score < 50 {
triggerAlert("健康分过低")
}
}
}))
学习目标:掌握配置热更新与自动回滚
运行示例:
EXAMPLE_MODE=mock go run ./examples/10_blue_green_rebuild
流程:申请切换 → 观察验证 → 自动回滚(如失败)
核心代码:
// 获取当前规格
currentSpec := session.SpecSnapshot()
// 修改配置
newSpec := currentSpec
newSpec.Pipeline.Transcode.Bitrate = 5000000 // 提高码率
// 执行蓝绿重建
err := session.Rebuild(ctx, newSpec)
if err != nil {
// 重建失败,自动回滚到原配置
log.Printf("重建失败,回滚: %v", err)
session.Rebuild(ctx, currentSpec)
}
学习目标:完成完整的容灾流程演练
运行示例:
EXAMPLE_MODE=mock go run ./examples/11_disaster_recovery_drill
演练阶段:稳定运行 → 故障注入 → 退化检测 → 回滚恢复 → 验收
学习目标:了解上云场景的预设矩阵
运行示例:
EXAMPLE_MODE=mock go run ./examples/12_cloud_uplink_profiles
预设类型:
学习目标:诊断宿主硬件能力与插件状态
运行示例:
go run ./examples/13_hardware_diagnostics
输出内容:
核心代码:
// 执行硬件诊断
diagnostics := api.DiagnoseHardware()
// 打印能力快照
fmt.Printf("可用厂商: %v\n", diagnostics.Snapshot.AvailableVendors)
fmt.Printf("可用加速器: %v\n", diagnostics.Snapshot.AvailableAccelerators)
// 打印缺失能力
for _, vendor := range diagnostics.Vendors {
if len(vendor.MissingEncoders) > 0 {
fmt.Printf("厂商 %s 缺失编码器: %v\n", vendor.Vendor, vendor.MissingEncoders)
}
}
学习目标:实现新闻场景的主画面 + 嘉宾小窗
运行示例:
# 预览
go run ./examples/14_news_pip_composite
# 真实链路
EXAMPLE_MODE=real \
MAIN_RTSP_URI="rtsp://cam-main/live" \
PIP_RTSP_URI="rtsp://cam-guest/live" \
OUTPUT_URI="rtmp://media/live/news-pip" \
go run -tags gogst ./examples/14_news_pip_composite
默认参数(低负载 preset):
| 参数 | 默认值 |
|---|---|
| 分辨率 | 1280x720 |
| 帧率 | 15 fps |
| 码率 | 1500 kbps |
| 延迟 | 300 ms |
核心代码:
spec := stream.SessionSpec{
ID: "news-pip",
Pipeline: stream.PipelineSpec{
Composite: stream.CompositeSpec{
Enabled: true,
Mode: stream.CompositeModePIP,
Width: 1280,
Height: 720,
FPS: 15,
Background: "black",
AudioMode: stream.CompositeAudioFirst,
Inputs: []stream.CompositeInputSpec{
{
ID: "main", Type: stream.SourceTypeRTSP,
URI: mainRTSP,
X: 0, Y: 0, Width: 1280, Height: 720, ZOrder: 0,
},
{
ID: "pip", Type: stream.SourceTypeRTSP,
URI: pipRTSP,
X: 1056, Y: 606, Width: 200, Height: 112, ZOrder: 10,
},
},
},
},
}
学习目标:掌握 ABR 档位自动切换
学习目标:理解多路流的时间戳同步
学习目标:掌握 SEI 消息的注入与使用
学习目标:实现 VLC 播放器品牌元数据注入
学习目标:查询历史时间范围的流状态统计
运行示例:
go run ./examples/22_timeseries_aggregation
核心代码:
// 配置时间序列聚合器
cfg := stream.TimeSeriesConfig{
Enabled: true,
SampleInterval: 30 * time.Second,
MaxAge: 7 * 24 * time.Hour,
}
manager.SetTimeSeriesConfig(ctx, cfg)
// 查询历史统计
end := time.Now()
start := end.Add(-1 * time.Hour)
result, _ := manager.QueryTimeRange("session-id", start, end)
fmt.Printf("平均码率: %.2f Kbps\n", result.AvgBitrateKbps)
fmt.Printf("平均健康分: %.2f\n", result.AvgHealthScore)
学习目标:实现新闻直播场景下的实时画面切换与转场效果
核心特性:
运行示例:
# 预设多源模式(高资源,低延迟,支持淡入淡出)
SWITCH_MODE=preset \
OUTPUT_URI=rtmp://media/live/news \
AUTO_SWITCH=true \
AUTO_SWITCH_INTERVAL=10s \
go run -tags gogst ./examples/23_seamless_switch
# 动态切换模式(低资源,短暂黑屏)
SWITCH_MODE=dynamic \
OUTPUT_URI=rtmp://media/live/news \
AUTO_SWITCH=true \
go run -tags gogst ./examples/23_seamless_switch
模式对比:
| 特性 | 预设多源模式 | 动态切换模式 |
|---|---|---|
| 资源消耗 | 高(每路一路解码) | 低(仅一路解码) |
| 切换延迟 | 无 | 短暂黑屏 |
| 转场效果 | 支持淡入淡出 | 不支持 |
核心代码:
// 预设模式:所有源同时拉流,通过 Alpha 切换
Inputs: []CompositeInputSpec{
{ID: "anchor", Alpha: 1.0, ZOrder: 0}, // 显示主播
{ID: "field1", Alpha: 0.0, ZOrder: 1}, // 隐藏现场1
{ID: "field2", Alpha: 0.0, ZOrder: 2}, // 隐藏现场2
}
// 淡入淡出切换
for step := 0; step <= 10; step++ {
alpha := float64(step) / 10.0
newComposite.Inputs[0].Alpha = 1.0 - alpha // 主播淡出
newComposite.Inputs[1].Alpha = alpha // 现场1淡入
manager.UpdateCompositeLayout(ctx, sessionID, newComposite)
time.Sleep(50 * time.Millisecond)
}
// 动态模式:只拉流当前源,切换时更新 URI
newComposite.Inputs[0].URI = "rtsp://field1/live"
manager.UpdateCompositeLayout(ctx, sessionID, newComposite)
01_quickstart — 理解基本流程02_preset_override — 掌握配置优先级03_governance_checkpoint — 了解治理能力04_fusion_fanout — 路由分发05_rtsp_to_rtmp_template — 生产模板06_persistent_checkpoint — 持久化07_real_gst_pipeline_template — GStreamer 集成08_multi_session_scheduler — 批量调度09_alerting_exporter — 告警系统10_blue_green_rebuild — 热更新11_disaster_recovery_drill — 容灾13_hardware_diagnostics — 诊断工具14_news_pip_composite — 画面合成18_abr — 自适应码率19_time_sync — 时间同步20_sei — SEI 消息21_metadata_branding — 元数据22_timeseries_aggregation — 时间序列统计23_seamless_switch — 无缝画面切换| 变量 | 说明 |
|---|---|
EXAMPLE_MODE | 运行模式(mock/real) |
INPUT_RTSP_URI | 输入 RTSP 地址 |
OUTPUT_URI | 输出地址 |
| 变量 | 默认值 | 说明 |
|---|---|---|
OUTPUT_WIDTH | 1920 | 输出宽度 |
OUTPUT_HEIGHT | 1080 | 输出高度 |
OUTPUT_FPS | 25 | 输出帧率 |
OUTPUT_BITRATE_KBPS | 2500 | 目标码率 |
ENCODE_TYPE | h264 | 编码格式 |
| 变量 | 默认值 | 说明 |
|---|---|---|
INPUT_LATENCY_MS | 200 | 拉流延迟 |
RUN_FOR | 12s | 运行时长 |
# 检查 GStreamer 版本
gst-launch-1.0 --version
# 检查可用插件
gst-inspect-1.0 | grep -E "(rtsp|rtmp|h264)"
# 测试管道
gst-launch-1.0 uridecodebin uri="rtsp://..." ! ... ! fakesink
# 检查 CGO
go env CGO_ENABLED
# 检查 pkg-config
pkg-config --modversion gstreamer-1.0
# 清理重建
go clean -cache && go build -tags gogst ./...
MIT License