低延迟传输子包 — 统一封装 SRT、WebRTC、RTMP 三种传输协议的 Pipeline 构建、自适应延迟优化与抖动缓冲管理。
- 多协议 Pipeline 编排 — SRT(
srtsrc)、WebRTC(webrtcbin)、RTMP(rtmpsrc)统一入口
- 自适应延迟优化 — 基于会话健康状态和网络质量动态调整目标延迟,收敛到配置区间
- 统计抖动缓冲 — 基于到达间隔均值 + 2σ 估算目标延迟,平滑网络抖动
- 传输层统计 — SRT 链路延迟/丢包率、WebRTC RTT/可用带宽/候选对状态
- 事件驱动 — 通过
EventBus 发布延迟优化事件,便于观测面集成
┌─────────────────────────────────────────────────────────────┐
│ LowLatencyPipeline │
│ ┌─────────────┐ ┌─────────────────┐ ┌─────────────────┐ │
│ │ Transport │ │ LatencyOptimizer│ │ JitterBuffer │ │
│ │ (SRT/WebRTC │ │ │ │ │ │
│ │ /RTMP) │ │ measureLatency │ │ avg + 2σ │ │
│ │ │ │ → Optimize │ │ → targetDelay │ │
│ └──────┬──────┘ └────────┬────────┘ └────────┬────────┘ │
│ │ │ │ │
│ ▼ ▼ ▼ │
│ BuildPipeline() ApplyTargetLatency() Push/PopFrame │
└─────────────────────────────────────────────────────────────┘
以 targetLatency 为收敛基准(而非 currentLatency),通过双 ticker 循环运行:
- 测量 ticker — 定期估算当前延迟(综合带宽、健康评分、连接状态)
- 优化 ticker — 当
currentLatency 偏离 targetLatency 超过 AdaptStep 时,触发缓冲增减
基于到达间隔的统计分析:
targetDelay = avgInterval + 2 * stdDev // 覆盖 95% 样本
targetDelay = clamp(targetDelay, MinDelay, MaxDelay)
targetDelay = smooth(targetDelay, old, weight=0.7) // 加权平滑
Push 时记录到达时间,Pop 时根据 targetDelay 判断是否可以返回帧。
import (
"cnb.cool/svn/stream"
"cnb.cool/svn/stream/lowlatency"
)
events := stream.NewEventBus(stream.EventBusConfig{})
defer events.Close()
pipeline := lowlatency.NewLowLatencyPipeline(
lowlatency.LowLatencyConfig{
TransportType: "srt",
SRTConfig: lowlatency.SRTConfig{
Latency: 100,
MaxBandwidth: 5000,
OverheadBW: 25,
StreamID: "stream-01",
Passphrase: "your-password",
KeySize: 128,
},
LatencyOptimizer: lowlatency.LatencyOptimizeConfig{
TargetLatency: 100 * time.Millisecond,
MinLatency: 20 * time.Millisecond,
MaxLatency: 500 * time.Millisecond,
EnableAdaptive: true,
},
JitterBuffer: lowlatency.JitterBufferConfig{
InitialDelay: 50 * time.Millisecond,
AdaptiveMode: true,
WindowSize: 32,
},
},
session,
healthTracker,
events,
)
p, err := pipeline.BuildPipeline("srt://example.com:9000")
pipeline := lowlatency.NewLowLatencyPipeline(
lowlatency.LowLatencyConfig{
TransportType: "webrtc",
WebRTCConfig: lowlatency.WebRTCConfig{
STUNServers: []string{"stun:stun.l.google.com:19302"},
LatencyMode: lowlatency.WebRTCLatencyLow,
Bandwidth: 2000,
BundlePolicy: "max-bundle",
ICETransportPolicy: "all",
},
LatencyOptimizer: lowlatency.LatencyOptimizeConfig{
TargetLatency: 80 * time.Millisecond,
EnableAdaptive: true,
},
},
session, healthTracker, events,
)
p, err := pipeline.BuildPipeline("")
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
go pipeline.Run(ctx)
if err := pipeline.Optimize(ctx); err != nil {
log.Printf("优化失败: %v", err)
}
if err := pipeline.PushFrame(frame); err != nil {
log.Printf("入缓冲失败: %v", err)
}
frame := pipeline.PopFrame()
if frame != nil {
}
snap := pipeline.Snapshot()
fmt.Printf("传输: %s, 应用延迟: %v, 当前延迟: %v, 缓冲大小: %d\n",
snap.TransportType,
snap.AppliedLatency,
snap.CurrentLatency,
snap.JitterBufferSize,
)
| 字段 | 类型 | 说明 |
|---|
Latency | int | 链路延迟(ms),建议 50-200 |
MaxBandwidth | int | 最大带宽限制(kbps) |
OverheadBW | int | 开销带宽百分比(0-100) |
PacketFilter | string | 数据包过滤器配置 |
StreamID | string | SRT 流标识 |
Passphrase | string | AES 加密密码 |
KeySize | int | 密钥大小(128/192/256) |
Encryption | int | 加密模式(0=无, 1=AES-128, 2=AES-192, 3=AES-256) |
WorkerCount | int | 工作线程数 |
Rendezvous | bool | 是否使用 Rendezvous 模式 |
| 字段 | 类型 | 说明 |
|---|
STUNServers | []string | STUN 服务器列表 |
TURNServers | []TURNServer | TURN 服务器列表(含用户名/密码) |
Codecs | []string | 编码器列表 |
Bandwidth | int | 带宽限制(kbps) |
LatencyMode | WebRTCLatencyMode | 延迟模式(见下表) |
ICETransportPolicy | string | ICE 传输策略:all / relay |
BundlePolicy | string | Bundle 策略:max-bundle / max-compat / balanced |
RTCPMuxPolicy | string | RTCP 复用策略:require / negotiate |
| 模式 | 延迟 | 场景 |
|---|
WebRTCLatencyUltraLow | ~30ms | 实时互动(视频会议、游戏) |
WebRTCLatencyLow | ~80ms | 低延迟直播推流 |
WebRTCLatencyNormal | ~150ms | 普通传输 |
| 字段 | 类型 | 默认值 | 说明 |
|---|
TargetLatency | time.Duration | 100ms | 目标延迟 |
MinLatency | time.Duration | 20ms | 最小延迟下限 |
MaxLatency | time.Duration | 500ms | 最大延迟上限 |
AdaptStep | time.Duration | 10ms | 单次调整步长 |
MeasureInterval | time.Duration | 100ms | 测量间隔 |
OptimizeInterval | time.Duration | 200ms | 优化间隔 |
EnableAdaptive | bool | false | 是否启用自适应优化 |
| 字段 | 类型 | 默认值 | 说明 |
|---|
MinDelay | time.Duration | 20ms | 最小延迟 |
MaxDelay | time.Duration | 500ms | 最大延迟 |
InitialDelay | time.Duration | 50ms | 初始延迟 |
AdaptiveMode | bool | false | 是否自适应调整 |
WindowSize | int | 32 | 统计窗口大小 |
| 传输类型 | 延迟范围 | 带宽要求 | CPU 开销 | 浏览器支持 |
|---|
| SRT | 50-200ms | 中等 | 低 | 否 |
| WebRTC | 30-150ms | 较高 | 中等 | 是 |
| RTMP | 200-2000ms | 低 | 低 | 否 |
| 场景 | 推荐协议 |
|---|
| 实时互动(双向) | WebRTC Ultra-Low |
| 广播推流(单向) | SRT |
| 兼容性优先 | RTMP |
| 网络状况 | 调优策略 |
|---|
| 网络稳定 | 降低 TargetLatency,启用自适应 |
| 网络抖动 | 增大 MinDelay 和 JitterBuffer.MaxDelay |
| 带宽受限 | 限制 SRTConfig.MaxBandwidth 或 WebRTCConfig.Bandwidth |
stats := pipeline.GetTransportStats().(lowlatency.SRTStats)
fmt.Printf("配置延迟: %d, 丢包: %d, RTT: %d\n",
stats.ConfiguredLatency, stats.PacketsLost, stats.RoundTripDelay)
stats := pipeline.GetTransportStats().(lowlatency.WebRTCStats)
fmt.Printf("可用带宽: %d, RTT: %d, 丢帧: %d\n",
stats.AvailableBitrate, stats.RoundTripDelay, stats.FramesDropped)
snap := pipeline.Snapshot()
fmt.Printf("缓冲大小: %d, 当前延迟: %v\n",
snap.JitterBufferSize, snap.CurrentLatency)
SRTConfig: lowlatency.SRTConfig{
Passphrase: "your-strong-password",
KeySize: 256,
}
TURNServers: []lowlatency.TURNServer{
{
URL: "turn:turn.example.com:3478",
Username: "user",
Password: "pass",
},
}
cd /workspace
go test -v ./lowlatency/...
go test -v -short ./lowlatency/...
go test -bench=. ./lowlatency/...
测试覆盖:
JitterBuffer — Push/Pop/自适应调整/清空/边界条件
RingBuffer — Push/Pop/Peek/覆盖丢弃/清空
SRTTransport — Pipeline 构建(含加密配置)/统计信息/延迟更新
WebRTCTransport — Pipeline 构建/默认配置/统计信息/延迟模式切换
LatencyOptimizer — 生命周期/减少缓冲/增加缓冲/边界限制/非自适应模式
LowLatencyPipeline — SRT/WebRTC/RTMP/不支持类型/延迟模式推荐值/优化回写/帧推拉
lowlatency/
├── DESIGN.md # 架构设计文档
├── README.md # 本文档
├── lowlatency_pipeline.go # 主编排器(传输 + 优化 + 缓冲)
├── lowlatency_pipeline_test.go # Pipeline 集成测试
├── srt_transport.go # SRT 传输协议
├── srt_transport_test.go # SRT 单元测试
├── webrtc_transport.go # WebRTC 传输协议
├── webrtc_transport_test.go # WebRTC 单元测试
├── latency_optimizer.go # 自适应延迟优化器
├── latency_optimizer_test.go # 优化器单元测试
├── jitter_buffer.go # 抖动缓冲 + 环形缓冲
└── jitter_buffer_test.go # 抖动缓冲单元测试
LatencyOptimizer.Optimize() 以 targetLatency 为调整基准,确保优化目标收敛到 [MinLatency, MaxLatency] 区间
sessionProvider 接口仅要求 ID() string 和 HealthSnapshot(),便于测试注入 mock
JitterBuffer.adjustBufferSize() 使用统计学方法(avg + 2σ)估算目标延迟,并通过加权平滑避免抖动
ApplyTargetLatency() 统一回写 transport 配置和 jitter buffer 目标延迟,确保三层状态一致
- Go 1.22+
cnb.cool/svn/stream(父模块)
github.com/pion/webrtc/v4(WebRTC PeerConnection)
- GStreamer 1.20+(带 SRT/WebRTC 插件,仅运行时)
MIT License