balancer 是 Yamux 的 session pool 子包,用于在多条 Yamux session 之间安全地打开新的 logical stream。它适合 WebSocket tunnel、多边缘节点接入、反向代理网关、长连接客户端集群等场景。
*yamux.Sessionround-robin、random、weighted、least-conn、affinitySessionKey、SessionID、RequiredMetadata 缩窄候选集Stream 包装器自动维护 active stream 计数AffinityKey / SourceIP sticky routing、TTL 清理与容量上限package main
import (
"context"
"net"
"time"
"cnb.cool/zishuo/yamux"
"cnb.cool/zishuo/yamux/balancer"
)
func buildPool() (*balancer.Pool, error) {
strategy := balancer.Affinity(
balancer.WithAffinityTTL(10*time.Minute),
balancer.WithAffinityFallback(balancer.LeastConn()),
)
return balancer.New(balancer.Config{
Strategy: strategy,
HealthFailureThreshold: 3,
HealthProbeInterval: 30 * time.Second,
})
}
func register(pool *balancer.Pool, conn net.Conn) (*balancer.SessionRef, error) {
session, err := yamux.Server(conn, nil)
if err != nil {
return nil, err
}
return pool.Add(session, balancer.SessionOptions{
ID: "node-a",
Label: "edge-a",
Weight: 3,
MaxActiveStreams: 128,
})
}
func openForTenant(pool *balancer.Pool, tenant string) (net.Conn, error) {
return pool.Open(context.Background(), balancer.RoutingContext{
AffinityKey: tenant,
})
}
func rotateNode(ref *balancer.SessionRef) {
ref.Drain("rolling update")
defer ref.Resume()
}
func openForZone(pool *balancer.Pool, zone string) (net.Conn, error) {
return pool.Open(context.Background(), balancer.RoutingContext{
SessionID: "edge",
RequiredMetadata: map[string]string{
"zone": zone,
},
})
}
func tuneNode(ref *balancer.SessionRef) {
_, _ = ref.Update(
balancer.WithSessionWeight(5),
balancer.MergeSessionMetadata(map[string]string{"canary": "true"}),
)
}
balancer 不直接处理 WebSocket upgrade。调用方应先用 websocket 子包把底层 message 连接变成 Yamux session,再注册到池中:
import (
"cnb.cool/zishuo/yamux/balancer"
yws "cnb.cool/zishuo/yamux/websocket"
)
func handleTunnel(pool *balancer.Pool, raw yws.FramedConn, clientID string, weight int) error {
session, err := yws.Server(raw, nil, nil)
if err != nil {
return err
}
ref, err := pool.Add(session, balancer.SessionOptions{
ID: clientID,
Weight: weight,
})
if err != nil {
session.Close()
return err
}
go func() {
<-session.CloseChan()
ref.Remove()
}()
return nil
}
Pool.Add已会监听Session.CloseChan()并自动摘除,示例中的 goroutine 只展示显式生命周期接线;多数场景可省略。
balancer.RoundRobin()
balancer.Random()
balancer.Weighted()
balancer.LeastConn()
balancer.Affinity(
balancer.WithAffinityTTL(5*time.Minute),
balancer.WithAffinityFallback(balancer.RoundRobin()),
)
也可以通过名称创建:
strategy, err := balancer.NewStrategy(
balancer.StrategyAffinity,
balancer.WithStrategyAffinityTTL(5*time.Minute),
balancer.WithStrategyFallback(balancer.Weighted()),
)
StrategyRoundRobin、StrategyRandom、StrategyWeighted、StrategyLeastConn、StrategyAffinity
均为 StrategyName 常量,调用方无需手写策略字符串。确定配置可信时,也可使用 MustStrategy:
pool, err := balancer.New(balancer.Config{ Strategy: balancer.MustStrategy(balancer.StrategyLeastConn), })
| 策略 | 语义 | 适用场景 |
|---|---|---|
round-robin | 候选 session 轮询 | 默认均衡 |
random | 随机选择 | 避免中心化计数 |
weighted | 平滑加权轮询 | 节点规格不同 |
least-conn | 选择 active stream 最少者 | 请求耗时差异大 |
affinity | 按 key sticky 到同一 session | 租户 / 用户亲和 |
RoutingContext 除了 AffinityKey 外,还支持在进入负载均衡策略前先过滤候选集:
SessionKey:定向到 Pool 内唯一 session keySessionID:定向到某组调用方定义的节点身份RequiredMetadata:要求候选 session 至少包含给定键值对这些约束是 AND 关系;若缩窄后没有任何候选,会返回 ErrNoMatchingSession。
OpenStream 失败会记录 session 失败Stream.Read / Stream.Write 会按错误分类记录健康相关失败io.EOF、net.ErrClosed、deadline timeout、stream reset 不视为 session 健康失败HealthFailureThreshold 后,session 暂时退出候选集HealthProbeInterval 后,不健康 session 会重新进入探测候选集Drain 的 session 不会参与新 stream 调度,但不会影响已有 streamMaxActiveStreams 达到上限后,session 会暂时退出候选集,待已有 stream 关闭后自动恢复CloseFailedSessions=false 为默认值;网关隧道场景可开启以快速触发 client 重连如需按业务语义覆盖默认规则,可提供 FailureClassifier:
pool, _ := balancer.New(balancer.Config{
FailureClassifier: func(ctx balancer.FailureContext) balancer.FailureDecision {
if ctx.Operation == balancer.FailureOperationStreamWrite &&
errors.Is(ctx.Err, os.ErrDeadlineExceeded) {
return balancer.FailureDecisionCount
}
return balancer.FailureDecisionIgnore
},
})
Affinity() 默认使用更安全的有界配置:
| 配置 | 默认值 | 说明 |
|---|---|---|
DefaultAffinityTTL | 10m | binding 10 分钟后过期 |
DefaultAffinityMaxBindings | 65536 | 超出上限时淘汰最久未更新 binding |
需要禁用过期或容量上限时必须显式声明:
strategy := balancer.Affinity(
balancer.WithAffinityTTL(0),
balancer.WithAffinityMaxBindings(0),
)
pool, _ := balancer.New(balancer.Config{
Hooks: balancer.Hooks{
OnHealthChange: func(event balancer.HealthEvent) {
log.Printf("session=%s healthy=%v reason=%s",
event.Snapshot.ID, event.Healthy, event.Reason)
},
},
})
stats := pool.Stats()
log.Printf("available=%d healthy=%d active=%d",
stats.SessionsAvailable, stats.SessionsHealthy, stats.ActiveStreams)
Stats 包含 session 数、可调度数、维护态数、容量受限数、健康数、探测候选数、active/pending streams、累计 open attempt/failure、affinity binding 统计与每个 session 的快照。
当调用 DrainSession / SessionRef.Drain 时,快照中的 Draining、DrainReason、DrainStartedAt 与 SessionsDraining 会同步反映维护态。
Hook panic 会被 Pool 隔离,并通过 OnHookPanic 回调暴露,避免第三方观测回调击穿调度主流程。
调用方可以在 session 存活期间动态调整其可变属性:
snapshot, err := pool.UpdateSession(
ref.Key,
balancer.WithSessionLabel("edge-canary"),
balancer.WithSessionWeight(8),
balancer.MergeSessionMetadata(map[string]string{
"version": "v2",
}),
)
当前支持:
WithSessionLabelWithSessionWeightReplaceSessionMetadataMergeSessionMetadataDeleteSessionMetadataKeysPool.Open / OpenStream 拿到的连接必须及时 Close()。Strategy 必须 goroutine-safe,并只返回合法候选下标;panic 会被转换为 ErrInvalidStrategy。balancer 只负责选择 session,不理解业务语义。