logo
0
0
WeChat Login

Stream 模块

Go Version License

生产级流媒体处理模块 — 面向工业视频检测系统的高性能流处理框架,支持自动配置探测、智能预设选择、会话生命周期管理与多厂商硬件加速。

特性

  • 零配置启动 — 通过自动探测与智能预设,最小化用户配置负担
  • 生产就绪 — 内置健康监控、事件溯源、指标导出与治理能力
  • 灵活扩展 — 基于接口的设计,支持自定义 Source、Sink、Engine 实现
  • 硬件无关 — 透明支持多厂商硬件加速(NVIDIA/Intel/AMD/Rockchip)
  • 多画面拼接 — 支持 Mosaic/PIP/Manual 三种模式的画面合成
  • 智能编码 — 支持统一编码语义与多编码器自动回退
  • AI 推理集成 — 轻量级推理后端抽象,支持 ONNX/TensorFlow/NCNN/TFLite
  • 低延迟传输 — 统一封装 SRT/WebRTC/RTMP 传输协议

目录

快速开始

最小示例

package main

import (
    "context"
    "fmt"
    "time"

    "cnb.cool/svn/stream"
)

func main() {
    // 创建管理器(需要 GStreamer 引擎或自定义 Source)
    engine := stream.NewGoGSTEngine()
    manager := stream.NewManager(engine, nil, nil)
    defer manager.Close()

    // 创建一键调用 API
    api := stream.NewOneCallAPI(manager, stream.OneCallConfig{
        AutoSelection: boolPtr(true),
        Metrics:       boolPtr(true),
        Events:        boolPtr(true),
    })

    // 构建请求
    req := stream.OneCallRequest{
        ID:         "camera-01",
        URI:        "rtsp://192.168.1.100/stream",
        SourceType: stream.SourceTypeRTSP,
    }

    // 启动会话
    result := api.StartSession(req)
    if result.Error != nil {
        panic(fmt.Sprintf("启动失败: %v", result.Error))
    }

    fmt.Printf("选择配置: %s\n", result.SelectedProfile)

    // 等待观察
    time.Sleep(5 * time.Second)

    // 停止会话
    api.StopSession("camera-01")
}

func boolPtr(b bool) *bool { return &b }

使用预设模板

result := api.StartSession(stream.OneCallRequest{
    ID:         "archive-01",
    URI:        "rtsp://192.168.1.100/stream",
    SourceType: stream.SourceTypeRTSP,
    Preset:     stream.PresetStorageArchiveH265Compact,
})

安装指南

环境要求

依赖版本要求说明
Go1.25.0+开发语言
GStreamer1.28.1+媒体处理(可选)
GLib2.64+GStreamer 依赖(可选)

基础安装

go get cnb.cool/svn/stream

构建模式

纯 Go 模式(默认)

适用于控制面场景或离线测试:

go build ./...
go test ./...

GStreamer 模式

完整媒体处理能力:

# 安装依赖
sudo apt-get install -y \
    libglib2.0-dev \
    libgstreamer1.0-dev \
    libgstreamer-plugins-base1.0-dev \
    gstreamer1.0-plugins-base \
    gstreamer1.0-plugins-good

# 构建
go build -tags gogst ./...
go test -tags gogst ./...

自定义 GStreamer 路径

export GST_ROOT=/workspace/bin
export PATH="$GST_ROOT/bin:$PATH"
export PKG_CONFIG_PATH="$GST_ROOT/lib/pkgconfig:${PKG_CONFIG_PATH:-}"
export CGO_CFLAGS="-I${GST_ROOT}/include"
export CGO_LDFLAGS="-L${GST_ROOT}/lib"
export LD_LIBRARY_PATH="$GST_ROOT/lib:${LD_LIBRARY_PATH:-}"
export GST_PLUGIN_PATH="$GST_ROOT/lib/gstreamer-1.0"

go build -tags gogst ./...

核心概念

架构分层

┌─────────────────────────────────────────────────────────────────┐
│                        Manager(管理器)                         │
│  ┌─────────────┐  ┌─────────────┐  ┌─────────────┐  ┌─────────┐ │
│  │   Session   │  │   Session   │  │   EventBus  │  │ Metrics │ │
│  │  (RTSP)     │  │  (RTMP)     │  │  (Pub/Sub)  │  │(Counter)│ │
│  └──────┬──────┘  └──────┬──────┘  └─────────────┘  └─────────┘ │
│         │                │                                      │
│  ┌──────▼──────┐  ┌──────▼──────┐  ┌─────────────────────────┐  │
│  │   Engine    │  │   Engine    │  │    OneCallAPI           │  │
│  │  (go-gst)   │  │  (go-gst)   │  │ (Probe→Select→Merge)    │  │
│  └──────┬──────┘  └──────┬──────┘  └─────────────────────────┘  │
└─────────┼────────────────┼───────────────────────────────────────┘
          │                │
    ┌─────▼─────┐    ┌─────▼─────┐
    │  Source   │    │   Sink    │
    │  (RTSP)   │    │  (RTMP)   │
    └───────────┘    └───────────┘

组件说明

组件职责
Manager会话生命周期管理、资源调度、治理能力
Session单个流的处理会话,包含状态机与健康监控
Engine媒体处理引擎(GStreamer 实现)
Source/Sink输入输出抽象接口
EventBus事件总线,支持同步/异步分发
MetricsPrometheus 兼容指标收集
OneCallAPI一键式 API,简化调用流程

运行时边界

根据构建方式,系统提供三种能力等级:

能力等级需要条件支持能力
纯 Go 模式默认构建自定义 Source/Sink、数据转发
完整媒体模式-tags gogstGStreamer 管道、多画面拼接
VideoUnit 模式完整媒体模式 + 编码器插件H.264/H.265 编码推流

运行时选择

系统支持三种运行时配置方式:

1. GStreamer Engine 运行时(完整媒体处理)

// 创建 GStreamer 引擎(必须带 -tags gogst 构建)
engine := stream.NewGoGSTEngine()
manager := stream.NewManager(engine, nil, nil)

适用场景:需要完整媒体处理能力,包括转码、多画面拼接、复杂 filter 操作。

2. Source 运行时(轻量级拉流)

// 仅使用自定义 Source(无需 GStreamer)
manager := stream.NewManager(nil, customSource, nil)

适用场景:应用内生成帧、自定义协议拉流、无需复杂媒体处理的场景。

3. 双运行时(灵活组合)

// Engine 处理复杂 pipeline,Source 提供输入
manager := stream.NewManager(engine, customSource, nil)

适用场景:需要自定义输入源,同时保留 GStreamer 的处理能力。

API 使用

OneCall API

一键式 API,自动完成探测、选配、合并、创建全流程:

api := stream.NewOneCallAPI(manager, stream.OneCallConfig{
    AutoSelection: boolPtr(true),
    Metrics:       boolPtr(true),
    Events:        boolPtr(true),
})

result := api.StartSession(stream.OneCallRequest{
    ID:         "session-01",
    URI:        "rtsp://camera/live",
    SourceType: stream.SourceTypeRTSP,
    Preset:     stream.PresetPushRTMPLowLatency,
})

启动前检查

readiness := api.CheckStartupReadiness(req)
if !readiness.Ready {
    for _, check := range readiness.Checks {
        if check.Status == stream.StartupCheckFailed {
            fmt.Printf("%s: %s\n", check.Name, check.Message)
        }
    }
}

会话管理

// 启动会话
session, err := manager.StartSession(ctx, spec)

// 停止会话
session.Stop()

// 重建会话(热更新)
session.Rebuild(ctx, newSpec)

// 订阅帧
subID, frameCh := session.Subscribe(stream.SubscribeOptions{
    BufferSize:   32,
    DropPolicy:   stream.DropPolicyKeyframeAware,
    KeyFrameOnly: false,
})

// 取消订阅
session.Unsubscribe(subID)

// 获取统计
stats := session.Stats()

健康监控

// 健康评分(0-100)
state, window := session.HealthSnapshot()
fmt.Printf("评分: %d\n", state.Score)
fmt.Printf("可用性: %d, 质量: %d, 饱和度: %d\n",
    state.Breakdown.AvailabilityScore,
    state.Breakdown.QualityScore,
    state.Breakdown.SaturationScore)

// 连通性状态
switch state.Connectivity {
case stream.ConnectivityHealthy:
    // 正常运行
case stream.ConnectivityDegraded:
    // 间歇性问题
case stream.ConnectivityUnhealthy:
    // 持续失败
}

多画面拼接

支持三种布局模式:

模式说明路数限制
mosaic网格拼接2-16 路
pip画中画固定 2 路
manual手工布局2-16 路

使用示例

spec := stream.SessionSpec{
    ID: "composite-4x4",
    Pipeline: stream.PipelineSpec{
        Composite: stream.CompositeSpec{
            Enabled:    true,
            Mode:       stream.CompositeModeManual,
            Width:      1920,
            Height:     1080,
            FPS:        25,
            Background: "black",
            AudioMode:  stream.CompositeAudioFirst,
            Inputs: []stream.CompositeInputSpec{
                {
                    ID:   "1",
                    Type: stream.SourceTypeRTSP,
                    URI:  "rtsp://192.168.1.101/stream1",
                    X:    0, Y: 0, Width: 960, Height: 540,
                },
                {
                    ID:   "2",
                    Type: stream.SourceTypeRTSP,
                    URI:  "rtsp://192.168.1.102/stream2",
                    X:    960, Y: 0, Width: 960, Height: 540,
                },
                // ... 更多输入
            },
        },
    },
}

session, err := manager.StartSession(ctx, spec)

画中画(PIP)

stream.CompositeSpec{
    Enabled: true,
    Mode:    stream.CompositeModePIP,
    Width:   1920,
    Height: 1080,
    Inputs: []stream.CompositeInputSpec{
        // 主画面
        {
            ID: "main", Type: stream.SourceTypeRTSP,
            URI: "rtsp://cam-main/live",
            X: 0, Y: 0, Width: 1920, Height: 1080, ZOrder: 0,
        },
        // 小窗
        {
            ID: "pip", Type: stream.SourceTypeRTSP,
            URI: "rtsp://cam-guest/live",
            X: 1440, Y: 810, Width: 480, Height: 270, ZOrder: 10,
        },
    },
}

运行时热更新

newLayout := spec.Pipeline.Composite
newLayout.Inputs[0].URI = "rtsp://new-camera/live"
err := manager.UpdateCompositeLayout(ctx, sessionID, newLayout)

订阅-发布与快照

StreamHub

按 URL 自动去重,单一连接多订阅者:

hub := stream.NewStreamHub(manager)

// 三个消费者订阅同一流
const url = "rtsp://192.168.1.101/stream"

_, ch1, rel1, _ := hub.Subscribe(ctx, url, stream.SubscribeOptions{})
defer rel1()

_, ch2, rel2, _ := hub.Subscribe(ctx, url, stream.SubscribeOptions{})
defer rel2()

go detectObjects(ch1)
go analyzePose(ch2)

// 截图预览(无需订阅)
frame, _ := hub.LastVideoFrame(url)

Session 快照

// 零开销截图
frame := session.LastVideoFrame()
if frame != nil {
    saveJPEG(frame.Payload)
}

// 等待下一关键帧
frame, err := session.WaitForKeyFrame(ctx, 2*time.Second)

// 批量收集
samples, _ := session.Drain(ctx, 5, stream.SubscribeOptions{
    KeyFrameOnly: true,
})

高级特性

硬件加速

支持多厂商硬件编解码:

厂商加速器编码器
NVIDIANVENCnvh264enc, nvh265enc
IntelQSV/VAAPIqsvh264enc, vaapih264enc
AMDAMF/VAAPIamfh264enc, vaapih264enc
RockchipMPP/V4L2mpph264enc, v4l2h264enc

智能编码参数

result := api.StartSession(stream.OneCallRequest{
    ID: "encoding-test",
    URI: "rtsp://camera/live",
    SourceType: stream.SourceTypeRTSP,
    Preset: stream.PresetTranscodeStandardH264AAC,
    Constraints: stream.ProfileConstraint{
        RequireHWAccel: true,
    },
})

事件订阅

events := manager.EventBus()
events.Subscribe("monitor", stream.EventHandlerFunc(func(e stream.Event) {
    switch e.Type {
    case stream.EventError:
        fmt.Printf("错误: %s\n", e.Message)
    case stream.EventHealthScore:
        fmt.Printf("健康分: %v\n", e.Data)
    }
}))

指标导出

metrics := manager.Metrics()
promText := metrics.ExportPrometheusText()

时间序列聚合

// 配置时间序列聚合器
cfg := stream.TimeSeriesConfig{
    Enabled:        true,
    SampleInterval: 30 * time.Second,
    MaxAge:         7 * 24 * time.Hour,
}
manager.SetTimeSeriesConfig(ctx, cfg)

// 查询历史统计
end := time.Now()
start := end.Add(-1 * time.Hour)
result, _ := manager.QueryTimeRange("session-id", start, end)

fmt.Printf("平均码率: %.2f Kbps\n", result.AvgBitrateKbps)
fmt.Printf("平均健康分: %.2f\n", result.AvgHealthScore)

扩展开发

自定义 Source

自定义 Source 允许从任意数据源获取帧数据,适用于:

  • 应用内生成帧(如屏幕捕获、图像序列)
  • 自定义协议(如私有流媒体协议)
  • 内存数据流(如共享内存、管道)

接口定义

// 基础 Source 接口
type Source interface {
    Open(ctx context.Context, spec SourceSpec) (SourceSession, error)
}

// 可感知完整会话规格的 Source(推荐)
type SessionAwareSource interface {
    OpenSession(ctx context.Context, spec SessionSpec) (SourceSession, error)
}

// SourceSession 必须实现
type SourceSession interface {
    ReadFrame(ctx context.Context) (*Frame, error)
    Close() error
}

实现示例:图像序列源

// ImageSequenceSource 从图片文件序列读取帧
type ImageSequenceSource struct {
    directory string
    pattern   string     // 如 "frame_%04d.jpg"
    fps       int
}

func (s *ImageSequenceSource) Open(ctx context.Context, spec stream.SourceSpec) (stream.SourceSession, error) {
    return &imageSeqSession{
        source:   s,
        index:    0,
        lastTime: time.Now(),
        interval: time.Second / time.Duration(s.fps),
    }, nil
}

type imageSeqSession struct {
    source   *ImageSequenceSource
    index    int
    lastTime time.Time
    interval time.Duration
}

func (s *imageSeqSession) ReadFrame(ctx context.Context) (*stream.Frame, error) {
    // 控制帧率
    elapsed := time.Since(s.lastTime)
    if elapsed < s.interval {
        time.Sleep(s.interval - elapsed)
    }
    s.lastTime = time.Now()

    // 构造文件路径并读取
    filename := fmt.Sprintf(filepath.Join(s.source.directory, s.source.pattern), s.index)
    s.index++

    // 读取并解码图片(此处省略具体实现)
    data, err := os.ReadFile(filename)
    if err != nil {
        return nil, err
    }

    return &stream.Frame{
        SourceID:   "image-seq",
        Type:       stream.MediaTypeVideo,
        Timestamp:  time.Now(),
        IsKeyFrame: true,
        Payload:    data,
    }, nil
}

func (s *imageSeqSession) Close() error {
    return nil
}

使用自定义 Source

source := &ImageSequenceSource{
    directory: "/path/to/frames",
    pattern:   "frame_%04d.jpg",
    fps:       25,
}

manager := stream.NewManager(nil, source, nil)
defer manager.Close()

session, err := manager.StartSession(ctx, stream.SessionSpec{
    ID: "image-sequence-session",
    Pipeline: stream.PipelineSpec{
        Pull: stream.SourceSpec{
            ID:   "image-seq",
            Type: stream.SourceTypeApp,
        },
    },
})

自定义 Sink

自定义 Sink 允许将帧数据输出到任意目标,适用于:

  • 自定义推流协议
  • 本地录制
  • AI 推理管道

接口定义

type Sink interface {
    Open(ctx context.Context, spec SinkSpec) (SinkSession, error)
}

type SinkSession interface {
    WriteFrame(ctx context.Context, frame *Frame) error
    Close() error
}

实现示例:文件录制 Sink

// FileRecorderSink 将帧录制到文件
type FileRecorderSink struct {
    outputDir string
}

func (s *FileRecorderSink) Open(ctx context.Context, spec stream.SinkSpec) (stream.SinkSession, error) {
    outputPath := filepath.Join(s.outputDir, spec.ID+".mkv")
    return &fileRecorderSession{
        outputPath: outputPath,
        // 初始化录制器...
    }, nil
}

type fileRecorderSession struct {
    outputPath string
    // 录制器状态...
}

func (s *fileRecorderSession) WriteFrame(ctx context.Context, frame *stream.Frame) error {
    // 写入帧到文件
    return nil
}

func (s *fileRecorderSession) Close() error {
    // 关闭文件并完成录制
    return nil
}

自定义 Engine(高级)

对于完全自定义的媒体处理管道,可以实现 Engine 接口:

type Engine interface {
    Name() string
    Start(ctx context.Context, runtime EngineRuntime) (EngineSession, error)
}

type EngineSession interface {
    Rebuild(ctx context.Context, pipeline PipelineSpec, fusion FusionSpec) error
    Stop(ctx context.Context) error
}

注意:自定义 Engine 仅在需要完全替代 GStreamer 时使用,大多数场景下自定义 Source/Sink 已足够。

最佳实践

生产硬化

manager := stream.NewManager(engine, source, sink)
manager.SetMaxSessions(2048)
manager.Metrics().SetMaxSeries(50000)

api := stream.NewOneCallAPI(manager, stream.OneCallConfig{
    AutoSelection: boolPtr(true),
    Metrics:       boolPtr(true),
    Events:        boolPtr(true),
})

错误处理

result := api.StartSession(req)
if result.Error != nil {
    switch result.ErrorReason {
    case stream.OneCallReasonRuntimeMissingDependency:
        // 缺少运行时依赖
    case stream.OneCallReasonSessionAlreadyExists:
        // 会话已存在
    case stream.OneCallReasonHardwareForcedUnavailable:
        // 硬件加速不可用
    }
}

会话恢复

// 设置检查点存储
manager.SetCheckpointStore(stream.NewFileCheckpointStore("/data/checkpoints"))

// 进程重启后恢复
recovered, err := manager.RecoverSessions(context.Background())

文档索引

文档说明
DESIGN.md架构设计文档
docs/ARCHITECTURE.md详细架构说明
docs/API_REFERENCE.mdAPI 参考
inference/README.mdAI 推理集成
lowlatency/README.md低延迟传输
examples/README.md示例集合

许可证

MIT License

About

No description, topics, or website provided.
Language
Go75%
C22.1%
Shell2.3%
Makefile0.4%
Others0.2%