logo
0
0
WeChat Login
docs: 统一中文文档,修正中英文混用

balancer

balancer 是 Yamux 的 session pool 子包,用于在多条 Yamux session 之间安全地打开新的 logical stream。它适合 WebSocket tunnel、多边缘节点接入、反向代理网关、长连接客户端集群等场景。

核心特性

  • 独立于 transport:不绑定 TCP / WebSocket / HTTP 框架,直接管理 *yamux.Session
  • 多策略内置round-robinrandomweightedleast-connaffinity
  • 健康感知:连续失败后摘出候选集,冷却后自动探测恢复
  • 维护态控制:支持对单个 session 手动 Drain / Resume,便于滚动发布与故障隔离
  • 定向路由:支持按 SessionKeySessionIDRequiredMetadata 缩窄候选集
  • 并发上限保护:支持按 session 限制并发 stream 数,防止热点节点被压穿
  • 在线更新:支持动态调整 label / weight / metadata,无需摘除并重加 session
  • 活跃流统计Stream 包装器自动维护 active stream 计数
  • 亲和绑定:支持 AffinityKey / SourceIP sticky routing、TTL 清理与容量上限
  • 第三方友好:小接口、无第三方依赖、panic-safe hooks + stats 控制面

快速开始

package main import ( "context" "net" "time" "cnb.cool/zishuo/yamux" "cnb.cool/zishuo/yamux/balancer" ) func buildPool() (*balancer.Pool, error) { strategy := balancer.Affinity( balancer.WithAffinityTTL(10*time.Minute), balancer.WithAffinityFallback(balancer.LeastConn()), ) return balancer.New(balancer.Config{ Strategy: strategy, HealthFailureThreshold: 3, HealthProbeInterval: 30 * time.Second, }) } func register(pool *balancer.Pool, conn net.Conn) (*balancer.SessionRef, error) { session, err := yamux.Server(conn, nil) if err != nil { return nil, err } return pool.Add(session, balancer.SessionOptions{ ID: "node-a", Label: "edge-a", Weight: 3, MaxActiveStreams: 128, }) } func openForTenant(pool *balancer.Pool, tenant string) (net.Conn, error) { return pool.Open(context.Background(), balancer.RoutingContext{ AffinityKey: tenant, }) } func rotateNode(ref *balancer.SessionRef) { ref.Drain("rolling update") defer ref.Resume() } func openForZone(pool *balancer.Pool, zone string) (net.Conn, error) { return pool.Open(context.Background(), balancer.RoutingContext{ SessionID: "edge", RequiredMetadata: map[string]string{ "zone": zone, }, }) } func tuneNode(ref *balancer.SessionRef) { _, _ = ref.Update( balancer.WithSessionWeight(5), balancer.MergeSessionMetadata(map[string]string{"canary": "true"}), ) }

WebSocket tunnel 接入

balancer 不直接处理 WebSocket upgrade。调用方应先用 websocket 子包把底层 message 连接变成 Yamux session,再注册到池中:

import ( "cnb.cool/zishuo/yamux/balancer" yws "cnb.cool/zishuo/yamux/websocket" ) func handleTunnel(pool *balancer.Pool, raw yws.FramedConn, clientID string, weight int) error { session, err := yws.Server(raw, nil, nil) if err != nil { return err } ref, err := pool.Add(session, balancer.SessionOptions{ ID: clientID, Weight: weight, }) if err != nil { session.Close() return err } go func() { <-session.CloseChan() ref.Remove() }() return nil }

Pool.Add 已会监听 Session.CloseChan() 并自动摘除,示例中的 goroutine 只展示显式生命周期接线;多数场景可省略。

策略

balancer.RoundRobin() balancer.Random() balancer.Weighted() balancer.LeastConn() balancer.Affinity( balancer.WithAffinityTTL(5*time.Minute), balancer.WithAffinityFallback(balancer.RoundRobin()), )

也可以通过名称创建:

strategy, err := balancer.NewStrategy( balancer.StrategyAffinity, balancer.WithStrategyAffinityTTL(5*time.Minute), balancer.WithStrategyFallback(balancer.Weighted()), )

StrategyRoundRobinStrategyRandomStrategyWeightedStrategyLeastConnStrategyAffinity 均为 StrategyName 常量,调用方无需手写策略字符串。确定配置可信时,也可使用 MustStrategy

pool, err := balancer.New(balancer.Config{ Strategy: balancer.MustStrategy(balancer.StrategyLeastConn), })

策略语义

策略语义适用场景
round-robin候选 session 轮询默认均衡
random随机选择避免中心化计数
weighted平滑加权轮询节点规格不同
least-conn选择 active stream 最少者请求耗时差异大
affinity按 key sticky 到同一 session租户 / 用户亲和

路由约束

RoutingContext 除了 AffinityKey 外,还支持在进入负载均衡策略前先过滤候选集:

  • SessionKey:定向到 Pool 内唯一 session key
  • SessionID:定向到某组调用方定义的节点身份
  • RequiredMetadata:要求候选 session 至少包含给定键值对

这些约束是 AND 关系;若缩窄后没有任何候选,会返回 ErrNoMatchingSession

健康与失败模型

  • OpenStream 失败会记录 session 失败
  • Stream.Read / Stream.Write 会按错误分类记录健康相关失败
  • io.EOFnet.ErrClosed、deadline timeout、stream reset 不视为 session 健康失败
  • 连续失败达到 HealthFailureThreshold 后,session 暂时退出候选集
  • 冷却达到 HealthProbeInterval 后,不健康 session 会重新进入探测候选集
  • 已关闭或收到 remote GoAway 的 session 不会参与新 stream 调度
  • 手动 Drain 的 session 不会参与新 stream 调度,但不会影响已有 stream
  • MaxActiveStreams 达到上限后,session 会暂时退出候选集,待已有 stream 关闭后自动恢复
  • CloseFailedSessions=false 为默认值;网关隧道场景可开启以快速触发 client 重连

如需按业务语义覆盖默认规则,可提供 FailureClassifier

pool, _ := balancer.New(balancer.Config{ FailureClassifier: func(ctx balancer.FailureContext) balancer.FailureDecision { if ctx.Operation == balancer.FailureOperationStreamWrite && errors.Is(ctx.Err, os.ErrDeadlineExceeded) { return balancer.FailureDecisionCount } return balancer.FailureDecisionIgnore }, })

Affinity 默认值

Affinity() 默认使用更安全的有界配置:

配置默认值说明
DefaultAffinityTTL10mbinding 10 分钟后过期
DefaultAffinityMaxBindings65536超出上限时淘汰最久未更新 binding

需要禁用过期或容量上限时必须显式声明:

strategy := balancer.Affinity( balancer.WithAffinityTTL(0), balancer.WithAffinityMaxBindings(0), )

运行时快照与 Hooks

pool, _ := balancer.New(balancer.Config{ Hooks: balancer.Hooks{ OnHealthChange: func(event balancer.HealthEvent) { log.Printf("session=%s healthy=%v reason=%s", event.Snapshot.ID, event.Healthy, event.Reason) }, }, }) stats := pool.Stats() log.Printf("available=%d healthy=%d active=%d", stats.SessionsAvailable, stats.SessionsHealthy, stats.ActiveStreams)

Stats 包含 session 数、可调度数、维护态数、容量受限数、健康数、探测候选数、active/pending streams、累计 open attempt/failure、affinity binding 统计与每个 session 的快照。 当调用 DrainSession / SessionRef.Drain 时,快照中的 DrainingDrainReasonDrainStartedAtSessionsDraining 会同步反映维护态。 Hook panic 会被 Pool 隔离,并通过 OnHookPanic 回调暴露,避免第三方观测回调击穿调度主流程。

在线更新 Session 属性

调用方可以在 session 存活期间动态调整其可变属性:

snapshot, err := pool.UpdateSession( ref.Key, balancer.WithSessionLabel("edge-canary"), balancer.WithSessionWeight(8), balancer.MergeSessionMetadata(map[string]string{ "version": "v2", }), )

当前支持:

  • WithSessionLabel
  • WithSessionWeight
  • ReplaceSessionMetadata
  • MergeSessionMetadata
  • DeleteSessionMetadataKeys

生产建议

  1. 在 WebSocket / HTTP upgrade 层完成鉴权、origin、read limit 与 compression 策略。
  2. 每个通过 Pool.Open / OpenStream 拿到的连接必须及时 Close()
  3. 自定义 Strategy 必须 goroutine-safe,并只返回合法候选下标;panic 会被转换为 ErrInvalidStrategy
  4. 对非幂等请求做上层 retry / idempotency 保护,balancer 只负责选择 session,不理解业务语义。