logo
0
0
WeChat Login
feat: 添加无缝画面切换示例

Stream 示例

本目录提供 cnb.cool/svn/stream 的渐进式示例,帮助开发者从零快速上手到生产部署。所有示例均遵循 Go 语言最佳实践,展示真实生产场景的代码实现。

示例概览

序号示例说明难度
01quickstart最小控制面链路入门
02preset_overridePreset 与 Override 组合入门
03governance_checkpoint治理能力(鉴权、审计、恢复)进阶
04fusion_fanoutFusion FanOut 路由进阶
05rtsp_to_rtmp_template生产模板进阶
06persistent_checkpoint文件持久化 checkpoint进阶
07real_gst_pipeline_template真实 GStreamer 模板进阶
08multi_session_scheduler多会话调度进阶
09alerting_exporter事件与指标联动告警高级
10blue_green_rebuild蓝绿重建/回滚演练高级
11disaster_recovery_drill容灾演练高级
12cloud_uplink_profiles上云预设矩阵高级
13hardware_diagnostics硬件诊断工具
14news_pip_composite新闻画中画应用
18abr自适应码率特性
19time_sync多路时间戳对齐特性
20seiSEI 消息注入特性
21metadata_brandingVLC 品牌元数据特性
22timeseries_aggregation时间序列聚合统计特性
23seamless_switch无缝画面切换应用

快速开始

运行模式说明

所有示例支持两种运行模式:

模式环境变量说明
MockEXAMPLE_MODE=mock离线模式,使用模拟数据
RealEXAMPLE_MODE=real真实模式,连接实际设备

基础示例

# 离线 smoke
EXAMPLE_MODE=mock go run ./examples/01_quickstart

# 真实链路
EXAMPLE_MODE=real \
INPUT_RTSP_URI="rtsp://camera/live" \
go run -tags gogst ./examples/01_quickstart

示例详解

01_quickstart — 最小控制面链路

学习目标:掌握最基本的会话创建与生命周期管理

运行示例

# Mock 模式
EXAMPLE_MODE=mock go run ./examples/01_quickstart

# Real 模式
EXAMPLE_MODE=real \
INPUT_RTSP_URI="rtsp://camera/live" \
go run -tags gogst ./examples/01_quickstart

核心代码

// 创建 GStreamer 引擎
engine := stream.NewGoGSTEngine()

// 创建管理器(需要 Engine、Source、Sink 中的至少一个)
manager := stream.NewManager(engine, nil, nil)
defer manager.Close()

// 创建一键调用 API
api := stream.NewOneCallAPI(manager, stream.OneCallConfig{})

// 构建请求
req := stream.OneCallRequest{
    ID:         "demo-01",
    URI:        "rtsp://camera/live",
    SourceType: stream.SourceTypeRTSP,
}

// 启动会话
result := api.StartSession(req)
if result.Error != nil {
    panic(fmt.Sprintf("启动失败: %v", result.Error))
}

// 等待观察
time.Sleep(5 * time.Second)

// 停止会话
api.StopSession("demo-01")

关键概念

  • Manager:会话生命周期管理器
  • Engine:媒体处理引擎(GStreamer 实现)
  • OneCallAPI:一键式 API,自动完成探测→选配→合并→创建
  • Session:单个流的处理会话

02_preset_override — Preset 与 Override

学习目标:理解预设模板与显式覆盖的优先级

运行示例

EXAMPLE_MODE=mock go run ./examples/02_preset_override

配置优先级:系统默认 < 预设配置 < 自动推荐 < 用户显式覆盖

核心代码

result := api.StartSession(stream.OneCallRequest{
    ID:         "preset-01",
    URI:        "rtsp://camera/live",
    SourceType: stream.SourceTypeRTSP,
    Preset:     stream.PresetStorageArchiveH265Compact,
    Override: stream.ProfileOverride{
        Transcode: &stream.TranscodeSpec{
            VideoCodec: "h265",
            Bitrate:    1000000, // 显式覆盖码率
        },
    },
})

内置预设

  • storage.archive.copy_prefer — 存储优先复制
  • storage.archive.h265_compact — H.265 紧凑存储
  • push.rtmp.low_latency — RTMP 低延迟推流
  • push.srt.robust — SRT 稳健推流
  • push.cloud.h265_uplink_economy — 上云经济型
  • push.cloud.h265_uplink_balanced — 上云均衡型
  • transcode.standard.h264_aac — 标准 H.264/AAC 转码
  • inspection.ai_realtime — AI 实时检测

03_governance_checkpoint — 治理能力

学习目标:掌握鉴权、审计、检查点恢复

运行示例

EXAMPLE_MODE=mock go run ./examples/03_governance_checkpoint

核心能力

  • 鉴权器(Authorizer) — 控制操作权限
  • 审计器(Auditor) — 记录操作日志
  • 检查点(Checkpoint) — 会话持久化

核心代码

// 设置鉴权器
manager.SetAuthorizer(stream.AuthorizerFunc(func(ctx context.Context, op string, spec stream.SessionSpec) error {
    if !strings.HasPrefix(spec.ID, "allowed-") {
        return fmt.Errorf("会话 ID 必须以 'allowed-' 开头")
    }
    return nil
}))

// 设置审计器
manager.SetAuditor(stream.AuditorFunc(func(ctx context.Context, op string, spec stream.SessionSpec, err error) {
    log.Printf("[AUDIT] op=%s id=%s err=%v", op, spec.ID, err)
}))

// 设置检查点存储
manager.SetCheckpointStore(stream.NewMemoryCheckpointStore())

// 启动会话
session, err := manager.StartSession(ctx, spec)

// 进程重启后恢复
recovered, err := manager.RecoverSessions(ctx)

04_fusion_fanout — 路由分发

学习目标:理解单输入多输出的路由模式

运行示例

EXAMPLE_MODE=mock go run ./examples/04_fusion_fanout

应用场景:一路摄像头同时推送到多个服务(RTMP、录制、AI 分析)

核心代码

spec := stream.SessionSpec{
    ID: "fusion-demo",
    Pipeline: stream.PipelineSpec{
        Pull: stream.SourceSpec{
            ID:   "camera",
            Type: stream.SourceTypeRTSP,
            URI:  "rtsp://camera/live",
        },
    },
    Fusion: stream.FusionSpec{
        Enabled: true,
        FanOut: []stream.FusionFanOutTarget{
            {ID: "rtmp", URI: "rtmp://server/live/stream"},
            {ID: "file", URI: "file:///tmp/record.mp4"},
        },
    },
}

关键特性

  • 单路拉流,多路推流
  • 每路输出独立配置
  • 故障隔离,单路失败不影响其他路

05_rtsp_to_rtmp_template — 生产模板

学习目标:掌握生产级 RTSP 到 RTMP 转推

运行示例

# 预览模式(默认)
go run ./examples/05_rtsp_to_rtmp_template

# 真实模式
REAL_MODE=true \
INPUT_RTSP_URI="rtsp://camera/live" \
OUTPUT_URI="rtmp://media/live/stream" \
go run -tags gogst ./examples/05_rtsp_to_rtmp_template

关键参数

参数默认值说明
OUTPUT_WIDTH1920输出宽度
OUTPUT_HEIGHT1080输出高度
OUTPUT_FPS25输出帧率
OUTPUT_BITRATE_KBPS2500目标码率
RUN_FOR12s运行时长

核心代码

req := stream.VideoUnitRequest{
    EncodeType: stream.VideoUnitEncodeH264,
    OutputType: stream.VideoUnitOutputRTMP,
    OutputURI:  "rtmp://media/live/stream",
    RTSPURL:    "rtsp://camera/live",
    TaskID:     "rtmp-push-01",
    Width:      1920,
    Height:     1080,
    FPS:        25,
    LatencyMS:  200,
    X26xExtra: stream.VideoUnitX26xExtra{
        BPS:    2500 * 1000,
        GOP:    50,
        QPInit: 26,
        QPMin:  26,
        QPMax:  50,
        RCMode: 0, // CBR
    },
}

06_persistent_checkpoint — 持久化检查点

学习目标:掌握文件持久化与进程恢复

运行示例

EXAMPLE_MODE=mock go run ./examples/06_persistent_checkpoint

检查点文件/tmp/stream_checkpoints_example.json

核心代码

// 创建文件检查点存储
store := stream.NewFileCheckpointStore("/tmp/stream_checkpoints_example.json")
manager.SetCheckpointStore(store)

// 启动会话
session, err := manager.StartSession(ctx, spec)

// 模拟进程重启
manager.Close()

// 创建新管理器并恢复
newManager := stream.NewManager(engine, nil, nil)
newManager.SetCheckpointStore(store)
recovered, err := newManager.RecoverSessions(ctx)

07_real_gst_pipeline_template — GStreamer 模板

学习目标:理解真实 GStreamer 管道的构建与使用

环境检查

bash ./examples/07_real_gst_pipeline_template/check_gst_env.sh

运行示例

GST_ROOT="/workspace/app/dynamic/1.28.1" \
PKG_CONFIG_PATH="$GST_ROOT/lib/pkgconfig" \
CGO_CFLAGS="-I$GST_ROOT/include" \
CGO_LDFLAGS="-L$GST_ROOT/lib" \
LD_LIBRARY_PATH="$GST_ROOT/lib:$LD_LIBRARY_PATH" \
REAL_MODE=true \
INPUT_RTSP_URI="rtsp://camera/live" \
OUTPUT_URI="rtmp://media/live/stream" \
go run -tags gogst ./examples/07_real_gst_pipeline_template

核心代码

// 使用 VideoUnit facade 封装 GStreamer 管道
result := stream.ValidateVideoUnitRequest(ctx, req)
if !result.Accepted {
    log.Printf("配置验证失败: %s, 建议: %s", result.ErrorCode, result.NextAction)
    return
}

unit, err := stream.NewVideoUnit(ctx, req, engine, nil)
if err != nil {
    log.Printf("创建 VideoUnit 失败: %v", err)
    return
}

// 启动处理
if err := unit.Start(); err != nil {
    log.Printf("启动失败: %v", err)
    return
}

08_multi_session_scheduler — 多会话调度

学习目标:掌握批量会话启动与调度控制

运行示例

# Mock 模式
EXAMPLE_MODE=mock go run ./examples/08_multi_session_scheduler

# Real 模式
EXAMPLE_MODE=real \
SCHED_INPUT_RTSP_TEMPLATE="rtsp://camera-%02d/live" \
SCHED_OUTPUT_URI_TEMPLATE="rtmp://media/live/scheduler-%02d" \
go run -tags gogst ./examples/08_multi_session_scheduler

调度参数

参数默认值说明
SCHED_SESSION_COUNT6会话数量
SCHED_MAX_CONCURRENT_STARTS2并发启动上限
SCHED_START_THROTTLE150ms启动节流间隔
SCHED_START_RETRY_MAX4最大重试次数

核心代码

scheduler := stream.NewSessionStartScheduler(manager, stream.SessionStartSchedulerConfig{
    MaxConcurrentStarts: 2,
    StartThrottle:       150 * time.Millisecond,
    RetryMax:            4,
})

for i := 0; i < sessionCount; i++ {
    id := fmt.Sprintf("sched-%03d", i)
    req := stream.OneCallRequest{
        ID:         id,
        URI:        fmt.Sprintf(inputTemplate, i),
        SourceType: stream.SourceTypeRTSP,
    }
    scheduler.Schedule(ctx, id, req)
}

// 等待所有会话启动完成
results := scheduler.Wait(ctx)

09_alerting_exporter — 告警导出

学习目标:掌握事件与指标联动告警

运行示例

EXAMPLE_MODE=real \
INPUT_RTSP_URI="rtsp://camera/live" \
OUTPUT_URI="rtmp://media/live/stream" \
go run -tags gogst ./examples/09_alerting_exporter

告警参数

参数默认值说明
ALERT_RECONNECT_THRESHOLD2重连告警阈值
ALERT_ERROR_EVENT_THRESHOLD3错误告警阈值
ALERT_CRITICAL_EVENT_THRESHOLD1严重告警阈值

核心代码

// 订阅事件
events.Subscribe("alerter", stream.EventHandlerFunc(func(e stream.Event) {
    switch e.Type {
    case stream.EventReconnect:
        reconnectCount.Add(1)
        if reconnectCount.Load() >= alertReconnectThreshold {
            triggerAlert("重连次数过多")
        }
    case stream.EventError:
        if e.Severity == stream.SeverityCritical {
            triggerAlert("严重错误")
        }
    case stream.EventHealthScore:
        if score, ok := e.Data["score"].(int); ok && score < 50 {
            triggerAlert("健康分过低")
        }
    }
}))

10_blue_green_rebuild — 蓝绿重建

学习目标:掌握配置热更新与自动回滚

运行示例

EXAMPLE_MODE=mock go run ./examples/10_blue_green_rebuild

流程:申请切换 → 观察验证 → 自动回滚(如失败)

核心代码

// 获取当前规格
currentSpec := session.SpecSnapshot()

// 修改配置
newSpec := currentSpec
newSpec.Pipeline.Transcode.Bitrate = 5000000 // 提高码率

// 执行蓝绿重建
err := session.Rebuild(ctx, newSpec)
if err != nil {
    // 重建失败,自动回滚到原配置
    log.Printf("重建失败,回滚: %v", err)
    session.Rebuild(ctx, currentSpec)
}

11_disaster_recovery_drill — 容灾演练

学习目标:完成完整的容灾流程演练

运行示例

EXAMPLE_MODE=mock go run ./examples/11_disaster_recovery_drill

演练阶段:稳定运行 → 故障注入 → 退化检测 → 回滚恢复 → 验收

12_cloud_uplink_profiles — 上云预设

学习目标:了解上云场景的预设矩阵

运行示例

EXAMPLE_MODE=mock go run ./examples/12_cloud_uplink_profiles

预设类型

  • economy — 弱网省带宽
  • balanced — 通用均衡
  • dualstream — 主辅双码流

13_hardware_diagnostics — 硬件诊断

学习目标:诊断宿主硬件能力与插件状态

运行示例

go run ./examples/13_hardware_diagnostics

输出内容

  • 理论能力基线
  • 实测能力快照
  • 缺失能力报告

核心代码

// 执行硬件诊断
diagnostics := api.DiagnoseHardware()

// 打印能力快照
fmt.Printf("可用厂商: %v\n", diagnostics.Snapshot.AvailableVendors)
fmt.Printf("可用加速器: %v\n", diagnostics.Snapshot.AvailableAccelerators)

// 打印缺失能力
for _, vendor := range diagnostics.Vendors {
    if len(vendor.MissingEncoders) > 0 {
        fmt.Printf("厂商 %s 缺失编码器: %v\n", vendor.Vendor, vendor.MissingEncoders)
    }
}

14_news_pip_composite — 新闻画中画

学习目标:实现新闻场景的主画面 + 嘉宾小窗

运行示例

# 预览
go run ./examples/14_news_pip_composite

# 真实链路
EXAMPLE_MODE=real \
MAIN_RTSP_URI="rtsp://cam-main/live" \
PIP_RTSP_URI="rtsp://cam-guest/live" \
OUTPUT_URI="rtmp://media/live/news-pip" \
go run -tags gogst ./examples/14_news_pip_composite

默认参数(低负载 preset):

参数默认值
分辨率1280x720
帧率15 fps
码率1500 kbps
延迟300 ms

核心代码

spec := stream.SessionSpec{
    ID: "news-pip",
    Pipeline: stream.PipelineSpec{
        Composite: stream.CompositeSpec{
            Enabled:    true,
            Mode:       stream.CompositeModePIP,
            Width:      1280,
            Height:     720,
            FPS:        15,
            Background: "black",
            AudioMode:  stream.CompositeAudioFirst,
            Inputs: []stream.CompositeInputSpec{
                {
                    ID: "main", Type: stream.SourceTypeRTSP,
                    URI: mainRTSP,
                    X: 0, Y: 0, Width: 1280, Height: 720, ZOrder: 0,
                },
                {
                    ID: "pip", Type: stream.SourceTypeRTSP,
                    URI: pipRTSP,
                    X: 1056, Y: 606, Width: 200, Height: 112, ZOrder: 10,
                },
            },
        },
    },
}

18_abr — 自适应码率

学习目标:掌握 ABR 档位自动切换

19_time_sync — 时间戳对齐

学习目标:理解多路流的时间戳同步

20_sei — SEI 消息注入

学习目标:掌握 SEI 消息的注入与使用

21_metadata_branding — 品牌元数据

学习目标:实现 VLC 播放器品牌元数据注入

22_timeseries_aggregation — 时间序列聚合

学习目标:查询历史时间范围的流状态统计

运行示例

go run ./examples/22_timeseries_aggregation

核心代码

// 配置时间序列聚合器
cfg := stream.TimeSeriesConfig{
    Enabled:        true,
    SampleInterval: 30 * time.Second,
    MaxAge:         7 * 24 * time.Hour,
}
manager.SetTimeSeriesConfig(ctx, cfg)

// 查询历史统计
end := time.Now()
start := end.Add(-1 * time.Hour)
result, _ := manager.QueryTimeRange("session-id", start, end)

fmt.Printf("平均码率: %.2f Kbps\n", result.AvgBitrateKbps)
fmt.Printf("平均健康分: %.2f\n", result.AvgHealthScore)

23_seamless_switch — 无缝画面切换

学习目标:实现新闻直播场景下的实时画面切换与转场效果

核心特性

  • 不中断推流的热更新能力
  • Alpha 淡入淡出转场
  • 两种模式权衡资源与延迟

运行示例

# 预设多源模式(高资源,低延迟,支持淡入淡出)
SWITCH_MODE=preset \
OUTPUT_URI=rtmp://media/live/news \
AUTO_SWITCH=true \
AUTO_SWITCH_INTERVAL=10s \
go run -tags gogst ./examples/23_seamless_switch

# 动态切换模式(低资源,短暂黑屏)
SWITCH_MODE=dynamic \
OUTPUT_URI=rtmp://media/live/news \
AUTO_SWITCH=true \
go run -tags gogst ./examples/23_seamless_switch

模式对比

特性预设多源模式动态切换模式
资源消耗高(每路一路解码)低(仅一路解码)
切换延迟短暂黑屏
转场效果支持淡入淡出不支持

核心代码

// 预设模式:所有源同时拉流,通过 Alpha 切换
Inputs: []CompositeInputSpec{
    {ID: "anchor", Alpha: 1.0, ZOrder: 0},  // 显示主播
    {ID: "field1", Alpha: 0.0, ZOrder: 1},  // 隐藏现场1
    {ID: "field2", Alpha: 0.0, ZOrder: 2},  // 隐藏现场2
}

// 淡入淡出切换
for step := 0; step <= 10; step++ {
    alpha := float64(step) / 10.0
    newComposite.Inputs[0].Alpha = 1.0 - alpha  // 主播淡出
    newComposite.Inputs[1].Alpha = alpha        // 现场1淡入
    manager.UpdateCompositeLayout(ctx, sessionID, newComposite)
    time.Sleep(50 * time.Millisecond)
}

// 动态模式:只拉流当前源,切换时更新 URI
newComposite.Inputs[0].URI = "rtsp://field1/live"
manager.UpdateCompositeLayout(ctx, sessionID, newComposite)

学习路线

初学者

  1. 01_quickstart — 理解基本流程
  2. 02_preset_override — 掌握配置优先级
  3. 03_governance_checkpoint — 了解治理能力

进阶开发

  1. 04_fusion_fanout — 路由分发
  2. 05_rtsp_to_rtmp_template — 生产模板
  3. 06_persistent_checkpoint — 持久化
  4. 07_real_gst_pipeline_template — GStreamer 集成
  5. 08_multi_session_scheduler — 批量调度

高级特性

  1. 09_alerting_exporter — 告警系统
  2. 10_blue_green_rebuild — 热更新
  3. 11_disaster_recovery_drill — 容灾
  4. 13_hardware_diagnostics — 诊断工具

特性专题

  • 14_news_pip_composite — 画面合成
  • 18_abr — 自适应码率
  • 19_time_sync — 时间同步
  • 20_sei — SEI 消息
  • 21_metadata_branding — 元数据
  • 22_timeseries_aggregation — 时间序列统计
  • 23_seamless_switch — 无缝画面切换

环境变量速查

通用变量

变量说明
EXAMPLE_MODE运行模式(mock/real)
INPUT_RTSP_URI输入 RTSP 地址
OUTPUT_URI输出地址

编码参数

变量默认值说明
OUTPUT_WIDTH1920输出宽度
OUTPUT_HEIGHT1080输出高度
OUTPUT_FPS25输出帧率
OUTPUT_BITRATE_KBPS2500目标码率
ENCODE_TYPEh264编码格式

运行参数

变量默认值说明
INPUT_LATENCY_MS200拉流延迟
RUN_FOR12s运行时长

故障排查

GStreamer 相关

# 检查 GStreamer 版本
gst-launch-1.0 --version

# 检查可用插件
gst-inspect-1.0 | grep -E "(rtsp|rtmp|h264)"

# 测试管道
gst-launch-1.0 uridecodebin uri="rtsp://..." ! ... ! fakesink

编译问题

# 检查 CGO
go env CGO_ENABLED

# 检查 pkg-config
pkg-config --modversion gstreamer-1.0

# 清理重建
go clean -cache && go build -tags gogst ./...

许可证

MIT License