logo
0
0
WeChat Login
docs: 精简并重构 README 文档

stream/lowlatency

低延迟传输子包 — 统一封装 SRT、WebRTC、RTMP 三种传输协议的 Pipeline 构建、自适应延迟优化与抖动缓冲管理。

特性

  • 多协议 Pipeline 编排 — SRT(srtsrc)、WebRTC(webrtcbin)、RTMP(rtmpsrc)统一入口
  • 自适应延迟优化 — 基于会话健康状态和网络质量动态调整目标延迟,收敛到配置区间
  • 统计抖动缓冲 — 基于到达间隔均值 + 2σ 估算目标延迟,平滑网络抖动
  • 传输层统计 — SRT 链路延迟/丢包率、WebRTC RTT/可用带宽/候选对状态
  • 事件驱动 — 通过 EventBus 发布延迟优化事件,便于观测面集成

架构

┌─────────────────────────────────────────────────────────────┐
│                 LowLatencyPipeline                          │
│  ┌─────────────┐  ┌─────────────────┐  ┌─────────────────┐ │
│  │ Transport   │  │ LatencyOptimizer│  │ JitterBuffer    │ │
│  │ (SRT/WebRTC │  │                 │  │                 │ │
│  │  /RTMP)     │  │ measureLatency  │  │ avg + 2σ        │ │
│  │             │  │  → Optimize     │  │  → targetDelay  │ │
│  └──────┬──────┘  └────────┬────────┘  └────────┬────────┘ │
│         │                  │                    │          │
│         ▼                  ▼                    ▼          │
│  BuildPipeline()    ApplyTargetLatency()   Push/PopFrame   │
└─────────────────────────────────────────────────────────────┘

核心概念

延迟优化器(LatencyOptimizer)

targetLatency 为收敛基准(而非 currentLatency),通过双 ticker 循环运行:

  • 测量 ticker — 定期估算当前延迟(综合带宽、健康评分、连接状态)
  • 优化 ticker — 当 currentLatency 偏离 targetLatency 超过 AdaptStep 时,触发缓冲增减

抖动缓冲(JitterBuffer)

基于到达间隔的统计分析:

targetDelay = avgInterval + 2 * stdDev   // 覆盖 95% 样本
targetDelay = clamp(targetDelay, MinDelay, MaxDelay)
targetDelay = smooth(targetDelay, old, weight=0.7)  // 加权平滑

Push 时记录到达时间,Pop 时根据 targetDelay 判断是否可以返回帧。

快速开始

SRT 传输

import (
    "cnb.cool/svn/stream"
    "cnb.cool/svn/stream/lowlatency"
)

events := stream.NewEventBus(stream.EventBusConfig{})
defer events.Close()

pipeline := lowlatency.NewLowLatencyPipeline(
    lowlatency.LowLatencyConfig{
        TransportType: "srt",
        SRTConfig: lowlatency.SRTConfig{
            Latency:      100,  // 100ms
            MaxBandwidth: 5000, // 5Mbps
            OverheadBW:   25,   // 25% 开销
            StreamID:     "stream-01",
            Passphrase:   "your-password",
            KeySize:      128,  // AES-128
        },
        LatencyOptimizer: lowlatency.LatencyOptimizeConfig{
            TargetLatency:  100 * time.Millisecond,
            MinLatency:     20 * time.Millisecond,
            MaxLatency:     500 * time.Millisecond,
            EnableAdaptive: true,
        },
        JitterBuffer: lowlatency.JitterBufferConfig{
            InitialDelay: 50 * time.Millisecond,
            AdaptiveMode: true,
            WindowSize:   32,
        },
    },
    session,      // *stream.Session(满足 sessionProvider 接口)
    healthTracker, // *stream.HealthTracker
    events,
)

// 构建 GStreamer Pipeline 字符串
p, err := pipeline.BuildPipeline("srt://example.com:9000")

WebRTC 传输

pipeline := lowlatency.NewLowLatencyPipeline(
    lowlatency.LowLatencyConfig{
        TransportType: "webrtc",
        WebRTCConfig: lowlatency.WebRTCConfig{
            STUNServers:        []string{"stun:stun.l.google.com:19302"},
            LatencyMode:        lowlatency.WebRTCLatencyLow,
            Bandwidth:          2000, // 2Mbps
            BundlePolicy:       "max-bundle",
            ICETransportPolicy: "all",
        },
        LatencyOptimizer: lowlatency.LatencyOptimizeConfig{
            TargetLatency:  80 * time.Millisecond,
            EnableAdaptive: true,
        },
    },
    session, healthTracker, events,
)

p, err := pipeline.BuildPipeline("")

启动延迟优化循环

ctx, cancel := context.WithCancel(context.Background())
defer cancel()

// 启动自动优化(阻塞,需 goroutine)
go pipeline.Run(ctx)

// 手动触发一次优化
if err := pipeline.Optimize(ctx); err != nil {
    log.Printf("优化失败: %v", err)
}

帧级抖动缓冲

// 推送帧
if err := pipeline.PushFrame(frame); err != nil {
    log.Printf("入缓冲失败: %v", err)
}

// 按目标延迟弹出帧(可能返回 nil,表示未到播放时间)
frame := pipeline.PopFrame()
if frame != nil {
    // 处理帧
}

获取快照

snap := pipeline.Snapshot()
fmt.Printf("传输: %s, 应用延迟: %v, 当前延迟: %v, 缓冲大小: %d\n",
    snap.TransportType,
    snap.AppliedLatency,
    snap.CurrentLatency,
    snap.JitterBufferSize,
)

配置参考

SRTConfig

字段类型说明
Latencyint链路延迟(ms),建议 50-200
MaxBandwidthint最大带宽限制(kbps)
OverheadBWint开销带宽百分比(0-100)
PacketFilterstring数据包过滤器配置
StreamIDstringSRT 流标识
PassphrasestringAES 加密密码
KeySizeint密钥大小(128/192/256)
Encryptionint加密模式(0=无, 1=AES-128, 2=AES-192, 3=AES-256)
WorkerCountint工作线程数
Rendezvousbool是否使用 Rendezvous 模式

WebRTCConfig

字段类型说明
STUNServers[]stringSTUN 服务器列表
TURNServers[]TURNServerTURN 服务器列表(含用户名/密码)
Codecs[]string编码器列表
Bandwidthint带宽限制(kbps)
LatencyModeWebRTCLatencyMode延迟模式(见下表)
ICETransportPolicystringICE 传输策略:all / relay
BundlePolicystringBundle 策略:max-bundle / max-compat / balanced
RTCPMuxPolicystringRTCP 复用策略:require / negotiate

WebRTC 延迟模式

模式延迟场景
WebRTCLatencyUltraLow~30ms实时互动(视频会议、游戏)
WebRTCLatencyLow~80ms低延迟直播推流
WebRTCLatencyNormal~150ms普通传输

LatencyOptimizeConfig

字段类型默认值说明
TargetLatencytime.Duration100ms目标延迟
MinLatencytime.Duration20ms最小延迟下限
MaxLatencytime.Duration500ms最大延迟上限
AdaptSteptime.Duration10ms单次调整步长
MeasureIntervaltime.Duration100ms测量间隔
OptimizeIntervaltime.Duration200ms优化间隔
EnableAdaptiveboolfalse是否启用自适应优化

JitterBufferConfig

字段类型默认值说明
MinDelaytime.Duration20ms最小延迟
MaxDelaytime.Duration500ms最大延迟
InitialDelaytime.Duration50ms初始延迟
AdaptiveModeboolfalse是否自适应调整
WindowSizeint32统计窗口大小

性能对比

传输类型延迟范围带宽要求CPU 开销浏览器支持
SRT50-200ms中等
WebRTC30-150ms较高中等
RTMP200-2000ms

最佳实践

协议选择

场景推荐协议
实时互动(双向)WebRTC Ultra-Low
广播推流(单向)SRT
兼容性优先RTMP

延迟参数调优

网络状况调优策略
网络稳定降低 TargetLatency,启用自适应
网络抖动增大 MinDelayJitterBuffer.MaxDelay
带宽受限限制 SRTConfig.MaxBandwidthWebRTCConfig.Bandwidth

监控指标

// SRT 指标
stats := pipeline.GetTransportStats().(lowlatency.SRTStats)
fmt.Printf("配置延迟: %d, 丢包: %d, RTT: %d\n",
    stats.ConfiguredLatency, stats.PacketsLost, stats.RoundTripDelay)

// WebRTC 指标
stats := pipeline.GetTransportStats().(lowlatency.WebRTCStats)
fmt.Printf("可用带宽: %d, RTT: %d, 丢帧: %d\n",
    stats.AvailableBitrate, stats.RoundTripDelay, stats.FramesDropped)

// 全局指标
snap := pipeline.Snapshot()
fmt.Printf("缓冲大小: %d, 当前延迟: %v\n",
    snap.JitterBufferSize, snap.CurrentLatency)

安全传输

// SRT 加密
SRTConfig: lowlatency.SRTConfig{
    Passphrase: "your-strong-password",
    KeySize:    256,  // AES-256
}

// WebRTC TURN 认证
TURNServers: []lowlatency.TURNServer{
    {
        URL:      "turn:turn.example.com:3478",
        Username: "user",
        Password: "pass",
    },
}

测试

# 运行所有测试
cd /workspace
go test -v ./lowlatency/...

# 运行短测试(跳过耗时测试)
go test -v -short ./lowlatency/...

# 运行基准测试
go test -bench=. ./lowlatency/...

测试覆盖:

  • JitterBuffer — Push/Pop/自适应调整/清空/边界条件
  • RingBuffer — Push/Pop/Peek/覆盖丢弃/清空
  • SRTTransport — Pipeline 构建(含加密配置)/统计信息/延迟更新
  • WebRTCTransport — Pipeline 构建/默认配置/统计信息/延迟模式切换
  • LatencyOptimizer — 生命周期/减少缓冲/增加缓冲/边界限制/非自适应模式
  • LowLatencyPipeline — SRT/WebRTC/RTMP/不支持类型/延迟模式推荐值/优化回写/帧推拉

目录结构

lowlatency/
├── DESIGN.md                    # 架构设计文档
├── README.md                    # 本文档
├── lowlatency_pipeline.go       # 主编排器(传输 + 优化 + 缓冲)
├── lowlatency_pipeline_test.go  # Pipeline 集成测试
├── srt_transport.go             # SRT 传输协议
├── srt_transport_test.go        # SRT 单元测试
├── webrtc_transport.go          # WebRTC 传输协议
├── webrtc_transport_test.go     # WebRTC 单元测试
├── latency_optimizer.go         # 自适应延迟优化器
├── latency_optimizer_test.go    # 优化器单元测试
├── jitter_buffer.go             # 抖动缓冲 + 环形缓冲
└── jitter_buffer_test.go        # 抖动缓冲单元测试

核心设计约束

  • LatencyOptimizer.Optimize()targetLatency 为调整基准,确保优化目标收敛到 [MinLatency, MaxLatency] 区间
  • sessionProvider 接口仅要求 ID() stringHealthSnapshot(),便于测试注入 mock
  • JitterBuffer.adjustBufferSize() 使用统计学方法(avg + 2σ)估算目标延迟,并通过加权平滑避免抖动
  • ApplyTargetLatency() 统一回写 transport 配置和 jitter buffer 目标延迟,确保三层状态一致

依赖

  • Go 1.22+
  • cnb.cool/svn/stream(父模块)
  • github.com/pion/webrtc/v4(WebRTC PeerConnection)
  • GStreamer 1.20+(带 SRT/WebRTC 插件,仅运行时)

许可证

MIT License