logo
0
0
WeChat Login

Yamux

Go Reference Go Report Card

Yamux (Yet another Multiplexer) 是一个高性能的 Go 语言流多路复用库。它在可靠的底层连接(如 TCP 或 Unix Domain Socket)之上提供面向流的多路复用能力,受 SPDY 协议启发但保持独立的协议设计。

目录

核心特性

  • 双向流:客户端和服务器均可主动发起流,适用于 NAT 穿透和服务端推送场景
  • 流量控制:基于滑动窗口的流控机制,防止接收方过载,避免流饥饿
  • 连接保活:内置 Ping/Keepalive 机制,支持负载均衡器环境下的长连接
  • 高性能:支持数千个逻辑流,低开销设计
  • 优雅关闭:支持半关闭(Half-Close)和强制重置(RST)语义
  • 可观测 Hooks:零依赖暴露 stream/session 生命周期与流控阻塞事件
  • 上下文支持:API 支持 context.Context 进行超时和取消控制
  • 可扩展 transport:可通过独立子包桥接 WebSocket 等 message-oriented 连接
  • 多 session 负载均衡balancer 子包提供 session pool、健康摘除、加权/最少连接/亲和策略
  • 维护态调度控制balancer 子包支持单节点 Drain / Resume,便于滚动发布与故障隔离
  • 可插拔失败分类balancer 子包支持自定义失败分类规则,按业务语义定义节点摘除门槛

安装

go get cnb.cool/zishuo/yamux

要求 Go 1.23 或更高版本。

快速开始

基础示例

package main import ( "log" "net" "time" "cnb.cool/zishuo/yamux" ) // 客户端示例 func runClient() { // 建立底层 TCP 连接 conn, err := net.Dial("tcp", "localhost:8080") if err != nil { log.Fatal(err) } // 创建 Yamux 客户端会话;成功后由 session 接管 conn 的生命周期 session, err := yamux.Client(conn, nil) if err != nil { conn.Close() log.Fatal(err) } defer session.Close() // 打开新流 stream, err := session.Open() if err != nil { log.Fatal(err) } defer stream.Close() // 使用流进行通信(实现 net.Conn 接口) if _, err := stream.Write([]byte("Hello, Yamux!")); err != nil { log.Printf("Write error: %v", err) } } // 服务器示例 func runServer() { listener, err := net.Listen("tcp", ":8080") if err != nil { log.Fatal(err) } defer listener.Close() for { conn, err := listener.Accept() if err != nil { log.Printf("Accept error: %v", err) continue } go handleConnection(conn) } } func handleConnection(conn net.Conn) { // 创建 Yamux 服务器会话;成功后由 session 接管 conn 的生命周期 session, err := yamux.Server(conn, nil) if err != nil { log.Printf("Session error: %v", err) conn.Close() return } defer session.Close() // 接受流 for { stream, err := session.Accept() if err != nil { log.Printf("Accept stream error: %v", err) return } go handleStream(stream) } } func handleStream(stream net.Conn) { defer stream.Close() buf := make([]byte, 1024) n, err := stream.Read(buf) if err != nil { log.Printf("Read error: %v", err) return } log.Printf("Received: %s", string(buf[:n])) }

使用 Context 控制

// 带超时的流打开 ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second) defer cancel() stream, err := session.OpenStreamContext(ctx) if err != nil { // 处理超时或取消 }

架构设计

核心组件

┌─────────────────────────────────────────────────────────────┐ │ Session │ │ ┌─────────────┐ ┌─────────────┐ ┌─────────────────────┐ │ │ │ Stream 1 │ │ Stream 2 │ │ Stream N │ │ │ │ (net.Conn) │ │ (net.Conn) │ │ (net.Conn) │ │ │ └──────┬──────┘ └──────┬──────┘ └──────────┬──────────┘ │ │ └─────────────────┴────────────────────┘ │ │ │ │ │ 多路复用层 │ │ │ │ │ ┌────────────────────────────────────────────────────────┐ │ │ │ Framing (12-byte Header) │ │ │ │ ┌─────────┬────────┬──────────┬───────────┬─────────┐ │ │ │ │ │ Version │ Type │ Flags │ StreamID │ Length │ │ │ │ │ │ 8-bit │ 8-bit │ 16-bit │ 32-bit │ 32-bit │ │ │ │ │ └─────────┴────────┴──────────┴───────────┴─────────┘ │ │ │ └────────────────────────────────────────────────────────┘ │ │ │ │ │ ┌──────────┴──────────┐ │ │ │ TCP/Unix Conn │ │ │ └─────────────────────┘ │ └─────────────────────────────────────────────────────────────┘

流状态机

┌───────────┐ │ Init │ └─────┬─────┘ │ SYN ▼ ┌───────────┐ ACK ┌─────────────┐ │ SYN-Sent │─────────────▶│ Established │ └───────────┘ └──────┬──────┘ │ │ │ ┌──────────────────────┘ │ │ │ ▼ FIN (local) │ ┌─────────────┐ │ │ LocalClose │ │ └──────┬──────┘ │ │ FIN (remote) │ ▼ │ ┌─────────────┐ └───▶│ Closed │ └─────────────┘

API 文档

会话管理

创建会话

// 创建服务器端会话 func Server(conn io.ReadWriteCloser, config *Config) (*Session, error) // 创建客户端会话 func Client(conn io.ReadWriteCloser, config *Config) (*Session, error)

会话方法

// 流管理 type Session struct { // 打开新流 Open() (net.Conn, error) OpenContext(ctx context.Context) (net.Conn, error) OpenStream() (*Stream, error) OpenStreamContext(ctx context.Context) (*Stream, error) // 接受流 Accept() (net.Conn, error) AcceptStream() (*Stream, error) AcceptStreamWithContext(ctx context.Context) (*Stream, error) // 监控 IsClosed() bool CloseChan() <-chan struct{} NumStreams() int AcceptBacklog() int SetHooks(hooks SessionHooks) // 控制 Close() error CloseContext(ctx context.Context) error Drain(ctx context.Context) error GoAway() error Ping() (time.Duration, error) // 运行时统计 Stats() SessionStats // 地址 LocalAddr() net.Addr RemoteAddr() net.Addr }

运行时状态快照

type SessionStats struct { ActiveStreams int InflightStreams int AcceptBacklog int AcceptBacklogCapacity int PendingPings int LocalGoAway bool RemoteGoAway bool Closed bool } type StreamStats struct { ID uint32 State string RecvWindow uint32 SendWindow uint32 BufferedBytes int Closed bool }

生命周期 Hooks

type SessionHooks struct { OnStreamOpen func(StreamEvent) OnStreamAccept func(StreamEvent) OnStreamClose func(StreamEvent) OnStreamReset func(StreamEvent) OnGoAway func(GoAwayEvent) OnFlowControlStall func(FlowControlStallEvent) } type StreamEvent struct { Session *Session Stream *Stream StreamID uint32 Inbound bool State string Reason StreamEventReason } type GoAwayEvent struct { Session *Session Remote bool Code uint32 } type FlowControlStallEvent struct { Session *Session Stream *Stream StreamID uint32 Inbound bool SendWindow uint32 PendingBytes int }

这些 hooks 仅暴露事件,不内置 OpenTelemetry / Prometheus 依赖,便于上层自行接入指标、日志与审计链路。 回调会在 yamux 内部 goroutine 上同步执行,建议保持轻量、快速返回。

流操作

Stream 实现了完整的 net.Conn 接口:

type Stream struct { // 标准 IO 接口 Read(p []byte) (n int, err error) Write(p []byte) (n int, err error) Close() error // 元数据 StreamID() uint32 Session() *Session // 生命周期监控 IsClosed() bool CloseChan() <-chan struct{} Stats() StreamStats // 半关闭支持 CloseWrite() error HalfClose() error // 超时控制 SetDeadline(t time.Time) error SetReadDeadline(t time.Time) error SetWriteDeadline(t time.Time) error // 内存优化 Shrink() }

配置选项

type Config struct { // 待接受流的积压队列大小(默认:256) AcceptBacklog int // 启用保活探测(默认:true) EnableKeepAlive bool // 保活探测间隔(默认:30s) KeepAliveInterval time.Duration // 连接写入超时,作为安全阀值(默认:10s) ConnectionWriteTimeout time.Duration // 最大流窗口大小(默认:256KB) MaxStreamWindowSize uint32 // 流打开超时,等待 ACK 的最大时间(默认:75s) StreamOpenTimeout time.Duration // 流关闭超时,半关闭状态的最大时间(默认:5m) StreamCloseTimeout time.Duration // 生命周期 hooks,零值表示关闭 Hooks SessionHooks // 日志输出(与 Logger 二选一) LogOutput io.Writer // 自定义日志记录器(与 LogOutput 二选一) Logger Logger } // 获取默认配置 func DefaultConfig() *Config // 验证配置有效性 func VerifyConfig(config *Config) error

底层连接契约

传入 Server / Client 的底层连接必须满足:

  • 可靠、有序、全双工的字节流
  • Close() 能够打断阻塞中的 Read / Write
  • 当接入 message-oriented transport(如 WebSocket)时,应先通过独立子包完成桥接

错误处理

var ( ErrInvalidVersion = errors.New("invalid protocol version") ErrInvalidMsgType = errors.New("invalid msg type") ErrSessionShutdown = errors.New("session shutdown") ErrStreamsExhausted = errors.New("streams exhausted") ErrDuplicateStream = errors.New("duplicate stream initiated") ErrRecvWindowExceeded = errors.New("recv window exceeded") ErrTimeout = &NetError{timeout: true} ErrStreamClosed = errors.New("stream closed") ErrUnexpectedFlag = errors.New("unexpected flag") ErrRemoteGoAway = errors.New("remote end is not accepting connections") ErrConnectionReset = errors.New("connection reset") ErrConnectionWriteTimeout = errors.New("connection write timeout") ErrKeepAliveTimeout = errors.New("keepalive timeout") ErrInvalidConnection = errors.New("invalid nil connection") )

WebSocket transport 子包

websocket 子包将 WebSocket message 连接适配为 yamux 所需的字节流连接,同时保持 core 包本身不绑定具体 WebSocket 实现。

适用场景

  • 浏览器 / Gateway / LB 后的长连接复用
  • 需要在 WebSocket 隧道中承载多个逻辑流
  • 希望保持 yamux core 无第三方 WebSocket 强依赖

快速示例

import ( "cnb.cool/zishuo/yamux/websocket" ) // rawWSConn 需要满足 websocket.FramedConn 接口 session, err := websocket.Client(rawWSConn, nil, nil) if err != nil { return err } defer session.Close()

子包特性

  • 默认仅接受 binary frame
  • 兼容 NextReader / NextWriter 风格接口
  • 可按需启用 transport 级 Ping
  • deadline / 地址能力按底层实现自动透传

更多细节见 websocket/README.md

balancer 子包

balancer 子包在多条 yamux.Session 之间提供 session pool 与负载均衡能力,适合 WebSocket tunnel、多边缘客户端、反向代理网关等多客户端接入场景。

子包特性

  • 内置 round-robinrandomweightedleast-connaffinity 策略
  • 基于连续失败阈值的健康摘除与冷却探测
  • Stream 包装器自动维护 active stream 计数
  • affinity binding 基于 Pool 内部唯一 session key,默认带 TTL 与容量上限
  • 暴露 panic-safe hooks 与 Stats(),不绑定具体指标后端

快速示例

import ( "context" "time" "cnb.cool/zishuo/yamux" "cnb.cool/zishuo/yamux/balancer" ) pool, err := balancer.New(balancer.Config{ Strategy: balancer.MustStrategy( balancer.StrategyAffinity, balancer.WithStrategyAffinityTTL(10*time.Minute), balancer.WithStrategyFallback(balancer.LeastConn()), ), }) if err != nil { return err } defer pool.Close() session, err := yamux.Server(conn, nil) if err != nil { return err } ref, err := pool.Add(session, balancer.SessionOptions{ ID: "node-a", Weight: 3, MaxActiveStreams: 128, }) if err != nil { session.Close() return err } defer ref.Close() // 滚动发布时可先摘出调度,再恢复 ref.Drain("rolling update") defer ref.Resume() stream, err := pool.Open(context.Background(), balancer.RoutingContext{ AffinityKey: "tenant-a", }) if err != nil { return err } defer stream.Close() stream, err = pool.Open(context.Background(), balancer.RoutingContext{ SessionID: "edge", RequiredMetadata: map[string]string{ "zone": "ap-shanghai", }, }) if err != nil { return err } defer stream.Close() _, _ = ref.Update( balancer.WithSessionWeight(8), balancer.MergeSessionMetadata(map[string]string{"version": "v2"}), )

更多细节见 balancer/README.md

stream 子包

stream 在 Yamux 之上提供基于协议内容的多路分发能力,支持 cmux 风格的协议路由。

使用场景

在同一底层连接上同时服务多种协议:

  • HTTP/1.x 和 HTTP/2
  • TLS 和普通 TCP
  • gRPC 和 REST API

快速示例

import ( "cnb.cool/zishuo/yamux" "cnb.cool/zishuo/yamux/stream" ) func multiplexServer(conn net.Conn) error { // 创建 Yamux 会话 session, err := yamux.Server(conn, nil) if err != nil { return err } defer session.Close() // 创建多路分发器 mux := stream.New(session) // 按协议类型分发 http1L := mux.Match(stream.HTTP1Fast()) http2L := mux.Match(stream.HTTP2()) tlsL := mux.Match(stream.TLS()) defaultL := mux.Match(stream.Any()) // 启动各协议处理器 go http.Serve(http1L, http1Handler) go http.Serve(http2L, http2Handler) go serveTLS(tlsL) go handleRaw(defaultL) return mux.Serve() }

内置 Matchers

// 通用匹配器 stream.Any() // 匹配任意流 stream.AnyOf(stream.HTTP1Fast(), stream.HTTP2()) // 任一 matcher 命中 stream.AllOf( stream.HTTP1Fast(), stream.HTTP1HeaderField("Host", "api.example.com"), ) stream.PrefixMatcher("GET ", "POST ") // 前缀匹配 stream.BytePrefixMatcher([]byte{0x16, 0x03, 0x03}) // 二进制前缀匹配 // HTTP 协议 stream.HTTP1Fast() // 快速 HTTP/1.x 检测(推荐) stream.HTTP1() // 完整 HTTP/1.x 解析 stream.HTTP1HeaderField("Host", "api.example.com") stream.HTTP1HeaderFieldPrefix("User-Agent", "MyApp/") stream.HTTP2() // HTTP/2 检测 stream.HTTP2MatchSendSettings() // HTTP/2 带 SETTINGS 响应 stream.MCPStreamableHTTP("/mcp") // MCP Streamable HTTP 检测 // TLS stream.TLS() // TLS ClientHello 检测 stream.TLS(tls.VersionTLS12) // 指定 TLS 版本 stream.TLSSNI("api.example.com") // TLS SNI 检测 stream.TLSALPN("h2") // TLS ALPN 检测 stream.TLSClientHello(func(info stream.TLSClientHelloInfo) bool { return info.ServerName == "api.example.com" }) // 自定义 ClientHello 路由

生命周期钩子

mux.SetHooks(stream.Hooks{ OnServeStart: func(addr net.Addr) { log.Printf("Server started on %s", addr) }, OnServeStop: func(err error) { log.Printf("Server stopped: %v", err) }, OnAccept: func(conn net.Conn) { log.Printf("New connection from %s", conn.RemoteAddr()) }, OnMatch: func(event stream.MatchEvent) { log.Printf("Stream matched: listener=%d, matcher=%d", event.ListenerIndex, event.MatcherIndex) }, OnUnmatched: func(conn net.Conn) { log.Printf("Unmatched connection from %s", conn.RemoteAddr()) }, OnError: func(err error) { log.Printf("Error: %v", err) }, })

协议规范

帧格式

每个 Yamux 帧包含 12 字节的固定头部:

字段大小说明
Version1 byte协议版本(当前为 0)
Type1 byte消息类型
Flags2 bytes标志位
StreamID4 bytes流标识符
Length4 bytes负载长度

消息类型

类型说明
Data0x0数据传输
WindowUpdate0x1窗口更新(流控)
Ping0x2心跳/RTT 测量
GoAway0x3会话终止

标志位

标志说明
SYN0x1发起新流
ACK0x2确认流建立
FIN0x4半关闭
RST0x8强制重置

StreamID 分配

  • 客户端:使用奇数 ID(1, 3, 5...)
  • 服务器:使用偶数 ID(2, 4, 6...)
  • StreamID 0:保留用于会话级消息(Ping/GoAway)

详细协议规范请参阅 spec.md

最佳实践

1. 配置调优

config := yamux.DefaultConfig() // 高并发场景增加积压队列 config.AcceptBacklog = 1024 // 调整窗口大小适应高带宽延迟积网络 config.MaxStreamWindowSize = 1024 * 1024 // 1MB // 根据网络环境调整保活间隔 config.KeepAliveInterval = 15 * time.Second // 生产环境建议设置合理的超时 config.StreamOpenTimeout = 30 * time.Second config.StreamCloseTimeout = 2 * time.Minute

2. 资源管理

// 始终使用 defer 确保资源释放 session, err := yamux.Client(conn, config) if err != nil { return err } defer session.Close() // 流使用完毕及时关闭 stream, err := session.Open() if err != nil { return err } defer stream.Close() // 定期调用 Shrink 释放空闲内存 if s, ok := stream.(*yamux.Stream); ok { s.Shrink() }

3. 错误处理

// 区分可恢复和不可恢复错误 stream, err := session.AcceptStream() if err != nil { if err == yamux.ErrSessionShutdown { // 会话正常关闭,优雅退出 return } if errors.Is(err, yamux.ErrRemoteGoAway) { // 对端拒绝新连接 return } // 处理其他错误 }

4. 监控和可观测性

// 监控会话状态 log.Printf("Active streams: %d", session.NumStreams()) log.Printf("Accept backlog: %d", session.AcceptBacklog()) // 使用生命周期钩子进行指标收集 session.SetHooks(yamux.SessionHooks{ OnFlowControlStall: func(event yamux.FlowControlStallEvent) { metrics.IncCounter("yamux.flow_control_stall", "stream_id", strconv.FormatUint(uint64(event.StreamID), 10), ) }, }) // stream hooks 与 session hooks 可组合使用 mux.SetHooks(stream.Hooks{ OnMatch: func(event stream.MatchEvent) { metrics.IncCounter("streams.matched", "listener", strconv.Itoa(event.ListenerIndex), "matcher", strconv.Itoa(event.MatcherIndex), ) }, })

5. 并发模式

// 生产者-消费者模式 func producer(session *yamux.Session, workCh chan<- net.Conn) { for { stream, err := session.Accept() if err != nil { close(workCh) return } workCh <- stream } } func consumers(workCh <-chan net.Conn, n int) { var wg sync.WaitGroup for i := 0; i < n; i++ { wg.Add(1) go func() { defer wg.Done() for stream := range workCh { handleStream(stream) } }() } wg.Wait() }

项目结构

/workspace ├── mux.go # 配置和会话创建入口 ├── session.go # 会话管理核心实现 ├── stream.go # 流实现(net.Conn 接口) ├── addr.go # 地址相关实现 ├── const.go # 协议常量、错误定义、帧格式 ├── util.go # 工具函数和 Logger 接口 ├── hooks.go # 生命周期钩子定义与触发逻辑 ├── stats.go # Session / Stream 运行时统计快照 ├── lifecycle.go # Drain、CloseContext 等生命周期控制 ├── spec.md # 协议规范文档 ├── websocket/ # WebSocket transport 子包 │ ├── doc.go # 包文档 │ ├── conn.go # WebSocket -> net.Conn 适配器 │ ├── errors.go # 子包错误定义 │ ├── README.md # 子包文档 │ └── DESIGN.md # 设计说明 ├── balancer/ # 多 session 负载均衡子包 │ ├── doc.go # 包文档 │ ├── pool.go # session pool 与 open stream 编排 │ ├── strategy.go # 内置负载均衡策略 │ ├── affinity.go # sticky affinity 策略与 binding 治理 │ ├── stream.go # active stream 跟踪包装器 │ ├── README.md # 子包文档 │ └── DESIGN.md # 设计说明 ├── stream/ # 协议多路分发子包 │ ├── doc.go # 包文档 │ ├── mux.go # 多路分发器实现 │ ├── matchers.go # 协议匹配器 │ ├── buffer.go # 缓冲 sniff 实现 │ ├── listener.go # 子 listener 实现 │ ├── stats.go # mux / listener / matcher 分层统计 │ ├── DESIGN.md # 设计说明 │ └── README.md # 子包文档 ├── examples/ # 独立示例工程 │ ├── README.md # 示例索引 │ ├── gorilla-nethttp # gorilla/websocket + net/http upgrade 示例 │ ├── coder-websocket # coder/websocket 官方 NetConn 桥接示例 │ ├── balancer-gateway # 零额外依赖的 balancer gateway 示例 │ └── gorilla-balancer-gateway # websocket tunnel + balancer 完整示例 ├── README.md # 本文件 ├── CHANGELOG.md # 变更日志 ├── LICENSE # MPL-2.0 许可证 ├── go.mod # Go 模块定义 └── testdata/ # 测试证书和数据

贡献指南

开发环境

  • Go 1.23 或更高版本
  • 运行 go test ./... 确保所有测试通过
  • 运行 go vet ./... 检查静态分析问题

提交规范

  1. 保持协议兼容性:任何修改不得破坏现有 wire protocol 的向后兼容性
  2. 接口稳定性:公开 API 的变更需经过仔细评估,优先保持向后兼容
  3. 测试覆盖:核心逻辑修改需补充单元测试;涉及并发场景需补充 race 检测
  4. 文档同步:API 变更需同步更新 README.mdCHANGELOG.md 及相关子包文档
  5. 子包隔离websocketbalancerstream 子包应保持最小依赖,避免引入第三方库到 core 包

代码审查清单

  • session.Close()Stats() / NumStreams() 返回的状态是否准确
  • 新加错误是否通过 errors.Is 可识别
  • Hooks 回调是否保持轻量、无阻塞
  • transport 适配层是否正确处理 typed-nil 连接

许可证

本项目采用 Mozilla Public License 2.0 许可证。

Copyright IBM Corp. 2014, 2025 SPDX-License-Identifier: MPL-2.0

相关资源

About

这是一个基于自定义工作流自动创建的临时仓库

45.22 MiB
0 forks0 stars12 branches6 TagREADMEMPL-2.0 license
Language
Go100%