生产级流媒体处理模块 — 面向工业视频检测系统的高性能流处理框架,支持自动配置探测、智能预设选择、会话生命周期管理与多厂商硬件加速。
package main
import (
"context"
"fmt"
"time"
"cnb.cool/svn/stream"
)
func main() {
// 创建管理器(需要 GStreamer 引擎或自定义 Source)
engine := stream.NewGoGSTEngine()
manager := stream.NewManager(engine, nil, nil)
defer manager.Close()
// 创建一键调用 API
api := stream.NewOneCallAPI(manager, stream.OneCallConfig{
AutoSelection: boolPtr(true),
Metrics: boolPtr(true),
Events: boolPtr(true),
})
// 构建请求
req := stream.OneCallRequest{
ID: "camera-01",
URI: "rtsp://192.168.1.100/stream",
SourceType: stream.SourceTypeRTSP,
}
// 启动会话
result := api.StartSession(req)
if result.Error != nil {
panic(fmt.Sprintf("启动失败: %v", result.Error))
}
fmt.Printf("选择配置: %s\n", result.SelectedProfile)
// 等待观察
time.Sleep(5 * time.Second)
// 停止会话
api.StopSession("camera-01")
}
func boolPtr(b bool) *bool { return &b }
result := api.StartSession(stream.OneCallRequest{
ID: "archive-01",
URI: "rtsp://192.168.1.100/stream",
SourceType: stream.SourceTypeRTSP,
Preset: stream.PresetStorageArchiveH265Compact,
})
| 依赖 | 版本要求 | 说明 |
|---|---|---|
| Go | 1.25.0+ | 开发语言 |
| GStreamer | 1.28.1+ | 媒体处理(可选) |
| GLib | 2.64+ | GStreamer 依赖(可选) |
go get cnb.cool/svn/stream
适用于控制面场景或离线测试:
go build ./...
go test ./...
完整媒体处理能力:
# 安装依赖
sudo apt-get install -y \
libglib2.0-dev \
libgstreamer1.0-dev \
libgstreamer-plugins-base1.0-dev \
gstreamer1.0-plugins-base \
gstreamer1.0-plugins-good
# 构建
go build -tags gogst ./...
go test -tags gogst ./...
export GST_ROOT=/workspace/bin
export PATH="$GST_ROOT/bin:$PATH"
export PKG_CONFIG_PATH="$GST_ROOT/lib/pkgconfig:${PKG_CONFIG_PATH:-}"
export CGO_CFLAGS="-I${GST_ROOT}/include"
export CGO_LDFLAGS="-L${GST_ROOT}/lib"
export LD_LIBRARY_PATH="$GST_ROOT/lib:${LD_LIBRARY_PATH:-}"
export GST_PLUGIN_PATH="$GST_ROOT/lib/gstreamer-1.0"
go build -tags gogst ./...
┌─────────────────────────────────────────────────────────────────┐
│ Manager(管理器) │
│ ┌─────────────┐ ┌─────────────┐ ┌─────────────┐ ┌─────────┐ │
│ │ Session │ │ Session │ │ EventBus │ │ Metrics │ │
│ │ (RTSP) │ │ (RTMP) │ │ (Pub/Sub) │ │(Counter)│ │
│ └──────┬──────┘ └──────┬──────┘ └─────────────┘ └─────────┘ │
│ │ │ │
│ ┌──────▼──────┐ ┌──────▼──────┐ ┌─────────────────────────┐ │
│ │ Engine │ │ Engine │ │ OneCallAPI │ │
│ │ (go-gst) │ │ (go-gst) │ │ (Probe→Select→Merge) │ │
│ └──────┬──────┘ └──────┬──────┘ └─────────────────────────┘ │
└─────────┼────────────────┼───────────────────────────────────────┘
│ │
┌─────▼─────┐ ┌─────▼─────┐
│ Source │ │ Sink │
│ (RTSP) │ │ (RTMP) │
└───────────┘ └───────────┘
| 组件 | 职责 |
|---|---|
| Manager | 会话生命周期管理、资源调度、治理能力 |
| Session | 单个流的处理会话,包含状态机与健康监控 |
| Engine | 媒体处理引擎(GStreamer 实现) |
| Source/Sink | 输入输出抽象接口 |
| EventBus | 事件总线,支持同步/异步分发 |
| Metrics | Prometheus 兼容指标收集 |
| OneCallAPI | 一键式 API,简化调用流程 |
根据构建方式,系统提供三种能力等级:
| 能力等级 | 需要条件 | 支持能力 |
|---|---|---|
| 纯 Go 模式 | 默认构建 | 自定义 Source/Sink、数据转发 |
| 完整媒体模式 | -tags gogst | GStreamer 管道、多画面拼接 |
| VideoUnit 模式 | 完整媒体模式 + 编码器插件 | H.264/H.265 编码推流 |
系统支持三种运行时配置方式:
// 创建 GStreamer 引擎(必须带 -tags gogst 构建)
engine := stream.NewGoGSTEngine()
manager := stream.NewManager(engine, nil, nil)
适用场景:需要完整媒体处理能力,包括转码、多画面拼接、复杂 filter 操作。
// 仅使用自定义 Source(无需 GStreamer)
manager := stream.NewManager(nil, customSource, nil)
适用场景:应用内生成帧、自定义协议拉流、无需复杂媒体处理的场景。
// Engine 处理复杂 pipeline,Source 提供输入
manager := stream.NewManager(engine, customSource, nil)
适用场景:需要自定义输入源,同时保留 GStreamer 的处理能力。
一键式 API,自动完成探测、选配、合并、创建全流程:
api := stream.NewOneCallAPI(manager, stream.OneCallConfig{
AutoSelection: boolPtr(true),
Metrics: boolPtr(true),
Events: boolPtr(true),
})
result := api.StartSession(stream.OneCallRequest{
ID: "session-01",
URI: "rtsp://camera/live",
SourceType: stream.SourceTypeRTSP,
Preset: stream.PresetPushRTMPLowLatency,
})
readiness := api.CheckStartupReadiness(req)
if !readiness.Ready {
for _, check := range readiness.Checks {
if check.Status == stream.StartupCheckFailed {
fmt.Printf("%s: %s\n", check.Name, check.Message)
}
}
}
// 启动会话
session, err := manager.StartSession(ctx, spec)
// 停止会话
session.Stop()
// 重建会话(热更新)
session.Rebuild(ctx, newSpec)
// 订阅帧
subID, frameCh := session.Subscribe(stream.SubscribeOptions{
BufferSize: 32,
DropPolicy: stream.DropPolicyKeyframeAware,
KeyFrameOnly: false,
})
// 取消订阅
session.Unsubscribe(subID)
// 获取统计
stats := session.Stats()
// 健康评分(0-100)
state, window := session.HealthSnapshot()
fmt.Printf("评分: %d\n", state.Score)
fmt.Printf("可用性: %d, 质量: %d, 饱和度: %d\n",
state.Breakdown.AvailabilityScore,
state.Breakdown.QualityScore,
state.Breakdown.SaturationScore)
// 连通性状态
switch state.Connectivity {
case stream.ConnectivityHealthy:
// 正常运行
case stream.ConnectivityDegraded:
// 间歇性问题
case stream.ConnectivityUnhealthy:
// 持续失败
}
支持三种布局模式:
| 模式 | 说明 | 路数限制 |
|---|---|---|
mosaic | 网格拼接 | 2-16 路 |
pip | 画中画 | 固定 2 路 |
manual | 手工布局 | 2-16 路 |
spec := stream.SessionSpec{
ID: "composite-4x4",
Pipeline: stream.PipelineSpec{
Composite: stream.CompositeSpec{
Enabled: true,
Mode: stream.CompositeModeManual,
Width: 1920,
Height: 1080,
FPS: 25,
Background: "black",
AudioMode: stream.CompositeAudioFirst,
Inputs: []stream.CompositeInputSpec{
{
ID: "1",
Type: stream.SourceTypeRTSP,
URI: "rtsp://192.168.1.101/stream1",
X: 0, Y: 0, Width: 960, Height: 540,
},
{
ID: "2",
Type: stream.SourceTypeRTSP,
URI: "rtsp://192.168.1.102/stream2",
X: 960, Y: 0, Width: 960, Height: 540,
},
// ... 更多输入
},
},
},
}
session, err := manager.StartSession(ctx, spec)
stream.CompositeSpec{
Enabled: true,
Mode: stream.CompositeModePIP,
Width: 1920,
Height: 1080,
Inputs: []stream.CompositeInputSpec{
// 主画面
{
ID: "main", Type: stream.SourceTypeRTSP,
URI: "rtsp://cam-main/live",
X: 0, Y: 0, Width: 1920, Height: 1080, ZOrder: 0,
},
// 小窗
{
ID: "pip", Type: stream.SourceTypeRTSP,
URI: "rtsp://cam-guest/live",
X: 1440, Y: 810, Width: 480, Height: 270, ZOrder: 10,
},
},
}
newLayout := spec.Pipeline.Composite
newLayout.Inputs[0].URI = "rtsp://new-camera/live"
err := manager.UpdateCompositeLayout(ctx, sessionID, newLayout)
按 URL 自动去重,单一连接多订阅者:
hub := stream.NewStreamHub(manager)
// 三个消费者订阅同一流
const url = "rtsp://192.168.1.101/stream"
_, ch1, rel1, _ := hub.Subscribe(ctx, url, stream.SubscribeOptions{})
defer rel1()
_, ch2, rel2, _ := hub.Subscribe(ctx, url, stream.SubscribeOptions{})
defer rel2()
go detectObjects(ch1)
go analyzePose(ch2)
// 截图预览(无需订阅)
frame, _ := hub.LastVideoFrame(url)
// 零开销截图
frame := session.LastVideoFrame()
if frame != nil {
saveJPEG(frame.Payload)
}
// 等待下一关键帧
frame, err := session.WaitForKeyFrame(ctx, 2*time.Second)
// 批量收集
samples, _ := session.Drain(ctx, 5, stream.SubscribeOptions{
KeyFrameOnly: true,
})
支持多厂商硬件编解码:
| 厂商 | 加速器 | 编码器 |
|---|---|---|
| NVIDIA | NVENC | nvh264enc, nvh265enc |
| Intel | QSV/VAAPI | qsvh264enc, vaapih264enc |
| AMD | AMF/VAAPI | amfh264enc, vaapih264enc |
| Rockchip | MPP/V4L2 | mpph264enc, v4l2h264enc |
result := api.StartSession(stream.OneCallRequest{
ID: "encoding-test",
URI: "rtsp://camera/live",
SourceType: stream.SourceTypeRTSP,
Preset: stream.PresetTranscodeStandardH264AAC,
Constraints: stream.ProfileConstraint{
RequireHWAccel: true,
},
})
events := manager.EventBus()
events.Subscribe("monitor", stream.EventHandlerFunc(func(e stream.Event) {
switch e.Type {
case stream.EventError:
fmt.Printf("错误: %s\n", e.Message)
case stream.EventHealthScore:
fmt.Printf("健康分: %v\n", e.Data)
}
}))
metrics := manager.Metrics()
promText := metrics.ExportPrometheusText()
// 配置时间序列聚合器
cfg := stream.TimeSeriesConfig{
Enabled: true,
SampleInterval: 30 * time.Second,
MaxAge: 7 * 24 * time.Hour,
}
manager.SetTimeSeriesConfig(ctx, cfg)
// 查询历史统计
end := time.Now()
start := end.Add(-1 * time.Hour)
result, _ := manager.QueryTimeRange("session-id", start, end)
fmt.Printf("平均码率: %.2f Kbps\n", result.AvgBitrateKbps)
fmt.Printf("平均健康分: %.2f\n", result.AvgHealthScore)
自定义 Source 允许从任意数据源获取帧数据,适用于:
// 基础 Source 接口
type Source interface {
Open(ctx context.Context, spec SourceSpec) (SourceSession, error)
}
// 可感知完整会话规格的 Source(推荐)
type SessionAwareSource interface {
OpenSession(ctx context.Context, spec SessionSpec) (SourceSession, error)
}
// SourceSession 必须实现
type SourceSession interface {
ReadFrame(ctx context.Context) (*Frame, error)
Close() error
}
// ImageSequenceSource 从图片文件序列读取帧
type ImageSequenceSource struct {
directory string
pattern string // 如 "frame_%04d.jpg"
fps int
}
func (s *ImageSequenceSource) Open(ctx context.Context, spec stream.SourceSpec) (stream.SourceSession, error) {
return &imageSeqSession{
source: s,
index: 0,
lastTime: time.Now(),
interval: time.Second / time.Duration(s.fps),
}, nil
}
type imageSeqSession struct {
source *ImageSequenceSource
index int
lastTime time.Time
interval time.Duration
}
func (s *imageSeqSession) ReadFrame(ctx context.Context) (*stream.Frame, error) {
// 控制帧率
elapsed := time.Since(s.lastTime)
if elapsed < s.interval {
time.Sleep(s.interval - elapsed)
}
s.lastTime = time.Now()
// 构造文件路径并读取
filename := fmt.Sprintf(filepath.Join(s.source.directory, s.source.pattern), s.index)
s.index++
// 读取并解码图片(此处省略具体实现)
data, err := os.ReadFile(filename)
if err != nil {
return nil, err
}
return &stream.Frame{
SourceID: "image-seq",
Type: stream.MediaTypeVideo,
Timestamp: time.Now(),
IsKeyFrame: true,
Payload: data,
}, nil
}
func (s *imageSeqSession) Close() error {
return nil
}
source := &ImageSequenceSource{
directory: "/path/to/frames",
pattern: "frame_%04d.jpg",
fps: 25,
}
manager := stream.NewManager(nil, source, nil)
defer manager.Close()
session, err := manager.StartSession(ctx, stream.SessionSpec{
ID: "image-sequence-session",
Pipeline: stream.PipelineSpec{
Pull: stream.SourceSpec{
ID: "image-seq",
Type: stream.SourceTypeApp,
},
},
})
自定义 Sink 允许将帧数据输出到任意目标,适用于:
type Sink interface {
Open(ctx context.Context, spec SinkSpec) (SinkSession, error)
}
type SinkSession interface {
WriteFrame(ctx context.Context, frame *Frame) error
Close() error
}
// FileRecorderSink 将帧录制到文件
type FileRecorderSink struct {
outputDir string
}
func (s *FileRecorderSink) Open(ctx context.Context, spec stream.SinkSpec) (stream.SinkSession, error) {
outputPath := filepath.Join(s.outputDir, spec.ID+".mkv")
return &fileRecorderSession{
outputPath: outputPath,
// 初始化录制器...
}, nil
}
type fileRecorderSession struct {
outputPath string
// 录制器状态...
}
func (s *fileRecorderSession) WriteFrame(ctx context.Context, frame *stream.Frame) error {
// 写入帧到文件
return nil
}
func (s *fileRecorderSession) Close() error {
// 关闭文件并完成录制
return nil
}
对于完全自定义的媒体处理管道,可以实现 Engine 接口:
type Engine interface {
Name() string
Start(ctx context.Context, runtime EngineRuntime) (EngineSession, error)
}
type EngineSession interface {
Rebuild(ctx context.Context, pipeline PipelineSpec, fusion FusionSpec) error
Stop(ctx context.Context) error
}
注意:自定义 Engine 仅在需要完全替代 GStreamer 时使用,大多数场景下自定义 Source/Sink 已足够。
manager := stream.NewManager(engine, source, sink)
manager.SetMaxSessions(2048)
manager.Metrics().SetMaxSeries(50000)
api := stream.NewOneCallAPI(manager, stream.OneCallConfig{
AutoSelection: boolPtr(true),
Metrics: boolPtr(true),
Events: boolPtr(true),
})
result := api.StartSession(req)
if result.Error != nil {
switch result.ErrorReason {
case stream.OneCallReasonRuntimeMissingDependency:
// 缺少运行时依赖
case stream.OneCallReasonSessionAlreadyExists:
// 会话已存在
case stream.OneCallReasonHardwareForcedUnavailable:
// 硬件加速不可用
}
}
// 设置检查点存储
manager.SetCheckpointStore(stream.NewFileCheckpointStore("/data/checkpoints"))
// 进程重启后恢复
recovered, err := manager.RecoverSessions(context.Background())
| 文档 | 说明 |
|---|---|
| DESIGN.md | 架构设计文档 |
| docs/ARCHITECTURE.md | 详细架构说明 |
| docs/API_REFERENCE.md | API 参考 |
| inference/README.md | AI 推理集成 |
| lowlatency/README.md | 低延迟传输 |
| examples/README.md | 示例集合 |
MIT License