Yamux (Yet another Multiplexer) 是一个高性能的 Go 语言流多路复用库。它在可靠的底层连接(如 TCP 或 Unix Domain Socket)之上提供面向流的多路复用能力,受 SPDY 协议启发但保持独立的协议设计。
context.Context 进行超时和取消控制balancer 子包提供 session pool、健康摘除、加权/最少连接/亲和策略balancer 子包支持单节点 Drain / Resume,便于滚动发布与故障隔离balancer 子包支持自定义失败分类规则,按业务语义定义节点摘除门槛go get cnb.cool/zishuo/yamux
要求 Go 1.23 或更高版本。
package main
import (
"log"
"net"
"time"
"cnb.cool/zishuo/yamux"
)
// 客户端示例
func runClient() {
// 建立底层 TCP 连接
conn, err := net.Dial("tcp", "localhost:8080")
if err != nil {
log.Fatal(err)
}
// 创建 Yamux 客户端会话;成功后由 session 接管 conn 的生命周期
session, err := yamux.Client(conn, nil)
if err != nil {
conn.Close()
log.Fatal(err)
}
defer session.Close()
// 打开新流
stream, err := session.Open()
if err != nil {
log.Fatal(err)
}
defer stream.Close()
// 使用流进行通信(实现 net.Conn 接口)
if _, err := stream.Write([]byte("Hello, Yamux!")); err != nil {
log.Printf("Write error: %v", err)
}
}
// 服务器示例
func runServer() {
listener, err := net.Listen("tcp", ":8080")
if err != nil {
log.Fatal(err)
}
defer listener.Close()
for {
conn, err := listener.Accept()
if err != nil {
log.Printf("Accept error: %v", err)
continue
}
go handleConnection(conn)
}
}
func handleConnection(conn net.Conn) {
// 创建 Yamux 服务器会话;成功后由 session 接管 conn 的生命周期
session, err := yamux.Server(conn, nil)
if err != nil {
log.Printf("Session error: %v", err)
conn.Close()
return
}
defer session.Close()
// 接受流
for {
stream, err := session.Accept()
if err != nil {
log.Printf("Accept stream error: %v", err)
return
}
go handleStream(stream)
}
}
func handleStream(stream net.Conn) {
defer stream.Close()
buf := make([]byte, 1024)
n, err := stream.Read(buf)
if err != nil {
log.Printf("Read error: %v", err)
return
}
log.Printf("Received: %s", string(buf[:n]))
}
// 带超时的流打开
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
defer cancel()
stream, err := session.OpenStreamContext(ctx)
if err != nil {
// 处理超时或取消
}
┌─────────────────────────────────────────────────────────────┐ │ Session │ │ ┌─────────────┐ ┌─────────────┐ ┌─────────────────────┐ │ │ │ Stream 1 │ │ Stream 2 │ │ Stream N │ │ │ │ (net.Conn) │ │ (net.Conn) │ │ (net.Conn) │ │ │ └──────┬──────┘ └──────┬──────┘ └──────────┬──────────┘ │ │ └─────────────────┴────────────────────┘ │ │ │ │ │ 多路复用层 │ │ │ │ │ ┌────────────────────────────────────────────────────────┐ │ │ │ Framing (12-byte Header) │ │ │ │ ┌─────────┬────────┬──────────┬───────────┬─────────┐ │ │ │ │ │ Version │ Type │ Flags │ StreamID │ Length │ │ │ │ │ │ 8-bit │ 8-bit │ 16-bit │ 32-bit │ 32-bit │ │ │ │ │ └─────────┴────────┴──────────┴───────────┴─────────┘ │ │ │ └────────────────────────────────────────────────────────┘ │ │ │ │ │ ┌──────────┴──────────┐ │ │ │ TCP/Unix Conn │ │ │ └─────────────────────┘ │ └─────────────────────────────────────────────────────────────┘
┌───────────┐ │ Init │ └─────┬─────┘ │ SYN ▼ ┌───────────┐ ACK ┌─────────────┐ │ SYN-Sent │─────────────▶│ Established │ └───────────┘ └──────┬──────┘ │ │ │ ┌──────────────────────┘ │ │ │ ▼ FIN (local) │ ┌─────────────┐ │ │ LocalClose │ │ └──────┬──────┘ │ │ FIN (remote) │ ▼ │ ┌─────────────┐ └───▶│ Closed │ └─────────────┘
// 创建服务器端会话
func Server(conn io.ReadWriteCloser, config *Config) (*Session, error)
// 创建客户端会话
func Client(conn io.ReadWriteCloser, config *Config) (*Session, error)
// 流管理
type Session struct {
// 打开新流
Open() (net.Conn, error)
OpenContext(ctx context.Context) (net.Conn, error)
OpenStream() (*Stream, error)
OpenStreamContext(ctx context.Context) (*Stream, error)
// 接受流
Accept() (net.Conn, error)
AcceptStream() (*Stream, error)
AcceptStreamWithContext(ctx context.Context) (*Stream, error)
// 监控
IsClosed() bool
CloseChan() <-chan struct{}
NumStreams() int
AcceptBacklog() int
SetHooks(hooks SessionHooks)
// 控制
Close() error
CloseContext(ctx context.Context) error
Drain(ctx context.Context) error
GoAway() error
Ping() (time.Duration, error)
// 运行时统计
Stats() SessionStats
// 地址
LocalAddr() net.Addr
RemoteAddr() net.Addr
}
type SessionStats struct {
ActiveStreams int
InflightStreams int
AcceptBacklog int
AcceptBacklogCapacity int
PendingPings int
LocalGoAway bool
RemoteGoAway bool
Closed bool
}
type StreamStats struct {
ID uint32
State string
RecvWindow uint32
SendWindow uint32
BufferedBytes int
Closed bool
}
type SessionHooks struct {
OnStreamOpen func(StreamEvent)
OnStreamAccept func(StreamEvent)
OnStreamClose func(StreamEvent)
OnStreamReset func(StreamEvent)
OnGoAway func(GoAwayEvent)
OnFlowControlStall func(FlowControlStallEvent)
}
type StreamEvent struct {
Session *Session
Stream *Stream
StreamID uint32
Inbound bool
State string
Reason StreamEventReason
}
type GoAwayEvent struct {
Session *Session
Remote bool
Code uint32
}
type FlowControlStallEvent struct {
Session *Session
Stream *Stream
StreamID uint32
Inbound bool
SendWindow uint32
PendingBytes int
}
这些 hooks 仅暴露事件,不内置 OpenTelemetry / Prometheus 依赖,便于上层自行接入指标、日志与审计链路。 回调会在 yamux 内部 goroutine 上同步执行,建议保持轻量、快速返回。
Stream 实现了完整的 net.Conn 接口:
type Stream struct {
// 标准 IO 接口
Read(p []byte) (n int, err error)
Write(p []byte) (n int, err error)
Close() error
// 元数据
StreamID() uint32
Session() *Session
// 生命周期监控
IsClosed() bool
CloseChan() <-chan struct{}
Stats() StreamStats
// 半关闭支持
CloseWrite() error
HalfClose() error
// 超时控制
SetDeadline(t time.Time) error
SetReadDeadline(t time.Time) error
SetWriteDeadline(t time.Time) error
// 内存优化
Shrink()
}
type Config struct {
// 待接受流的积压队列大小(默认:256)
AcceptBacklog int
// 启用保活探测(默认:true)
EnableKeepAlive bool
// 保活探测间隔(默认:30s)
KeepAliveInterval time.Duration
// 连接写入超时,作为安全阀值(默认:10s)
ConnectionWriteTimeout time.Duration
// 最大流窗口大小(默认:256KB)
MaxStreamWindowSize uint32
// 流打开超时,等待 ACK 的最大时间(默认:75s)
StreamOpenTimeout time.Duration
// 流关闭超时,半关闭状态的最大时间(默认:5m)
StreamCloseTimeout time.Duration
// 生命周期 hooks,零值表示关闭
Hooks SessionHooks
// 日志输出(与 Logger 二选一)
LogOutput io.Writer
// 自定义日志记录器(与 LogOutput 二选一)
Logger Logger
}
// 获取默认配置
func DefaultConfig() *Config
// 验证配置有效性
func VerifyConfig(config *Config) error
传入 Server / Client 的底层连接必须满足:
Close() 能够打断阻塞中的 Read / Writevar (
ErrInvalidVersion = errors.New("invalid protocol version")
ErrInvalidMsgType = errors.New("invalid msg type")
ErrSessionShutdown = errors.New("session shutdown")
ErrStreamsExhausted = errors.New("streams exhausted")
ErrDuplicateStream = errors.New("duplicate stream initiated")
ErrRecvWindowExceeded = errors.New("recv window exceeded")
ErrTimeout = &NetError{timeout: true}
ErrStreamClosed = errors.New("stream closed")
ErrUnexpectedFlag = errors.New("unexpected flag")
ErrRemoteGoAway = errors.New("remote end is not accepting connections")
ErrConnectionReset = errors.New("connection reset")
ErrConnectionWriteTimeout = errors.New("connection write timeout")
ErrKeepAliveTimeout = errors.New("keepalive timeout")
ErrInvalidConnection = errors.New("invalid nil connection")
)
websocket 子包将 WebSocket message 连接适配为 yamux 所需的字节流连接,同时保持 core 包本身不绑定具体 WebSocket 实现。
yamux core 无第三方 WebSocket 强依赖import (
"cnb.cool/zishuo/yamux/websocket"
)
// rawWSConn 需要满足 websocket.FramedConn 接口
session, err := websocket.Client(rawWSConn, nil, nil)
if err != nil {
return err
}
defer session.Close()
binary frameNextReader / NextWriter 风格接口更多细节见 websocket/README.md。
balancer 子包在多条 yamux.Session 之间提供 session pool 与负载均衡能力,适合 WebSocket tunnel、多边缘客户端、反向代理网关等多客户端接入场景。
round-robin、random、weighted、least-conn、affinity 策略Stream 包装器自动维护 active stream 计数Stats(),不绑定具体指标后端import (
"context"
"time"
"cnb.cool/zishuo/yamux"
"cnb.cool/zishuo/yamux/balancer"
)
pool, err := balancer.New(balancer.Config{
Strategy: balancer.MustStrategy(
balancer.StrategyAffinity,
balancer.WithStrategyAffinityTTL(10*time.Minute),
balancer.WithStrategyFallback(balancer.LeastConn()),
),
})
if err != nil {
return err
}
defer pool.Close()
session, err := yamux.Server(conn, nil)
if err != nil {
return err
}
ref, err := pool.Add(session, balancer.SessionOptions{
ID: "node-a",
Weight: 3,
MaxActiveStreams: 128,
})
if err != nil {
session.Close()
return err
}
defer ref.Close()
// 滚动发布时可先摘出调度,再恢复
ref.Drain("rolling update")
defer ref.Resume()
stream, err := pool.Open(context.Background(), balancer.RoutingContext{
AffinityKey: "tenant-a",
})
if err != nil {
return err
}
defer stream.Close()
stream, err = pool.Open(context.Background(), balancer.RoutingContext{
SessionID: "edge",
RequiredMetadata: map[string]string{
"zone": "ap-shanghai",
},
})
if err != nil {
return err
}
defer stream.Close()
_, _ = ref.Update(
balancer.WithSessionWeight(8),
balancer.MergeSessionMetadata(map[string]string{"version": "v2"}),
)
更多细节见 balancer/README.md。
stream 在 Yamux 之上提供基于协议内容的多路分发能力,支持 cmux 风格的协议路由。
在同一底层连接上同时服务多种协议:
import (
"cnb.cool/zishuo/yamux"
"cnb.cool/zishuo/yamux/stream"
)
func multiplexServer(conn net.Conn) error {
// 创建 Yamux 会话
session, err := yamux.Server(conn, nil)
if err != nil {
return err
}
defer session.Close()
// 创建多路分发器
mux := stream.New(session)
// 按协议类型分发
http1L := mux.Match(stream.HTTP1Fast())
http2L := mux.Match(stream.HTTP2())
tlsL := mux.Match(stream.TLS())
defaultL := mux.Match(stream.Any())
// 启动各协议处理器
go http.Serve(http1L, http1Handler)
go http.Serve(http2L, http2Handler)
go serveTLS(tlsL)
go handleRaw(defaultL)
return mux.Serve()
}
// 通用匹配器
stream.Any() // 匹配任意流
stream.AnyOf(stream.HTTP1Fast(), stream.HTTP2()) // 任一 matcher 命中
stream.AllOf(
stream.HTTP1Fast(),
stream.HTTP1HeaderField("Host", "api.example.com"),
)
stream.PrefixMatcher("GET ", "POST ") // 前缀匹配
stream.BytePrefixMatcher([]byte{0x16, 0x03, 0x03}) // 二进制前缀匹配
// HTTP 协议
stream.HTTP1Fast() // 快速 HTTP/1.x 检测(推荐)
stream.HTTP1() // 完整 HTTP/1.x 解析
stream.HTTP1HeaderField("Host", "api.example.com")
stream.HTTP1HeaderFieldPrefix("User-Agent", "MyApp/")
stream.HTTP2() // HTTP/2 检测
stream.HTTP2MatchSendSettings() // HTTP/2 带 SETTINGS 响应
stream.MCPStreamableHTTP("/mcp") // MCP Streamable HTTP 检测
// TLS
stream.TLS() // TLS ClientHello 检测
stream.TLS(tls.VersionTLS12) // 指定 TLS 版本
stream.TLSSNI("api.example.com") // TLS SNI 检测
stream.TLSALPN("h2") // TLS ALPN 检测
stream.TLSClientHello(func(info stream.TLSClientHelloInfo) bool {
return info.ServerName == "api.example.com"
}) // 自定义 ClientHello 路由
mux.SetHooks(stream.Hooks{
OnServeStart: func(addr net.Addr) {
log.Printf("Server started on %s", addr)
},
OnServeStop: func(err error) {
log.Printf("Server stopped: %v", err)
},
OnAccept: func(conn net.Conn) {
log.Printf("New connection from %s", conn.RemoteAddr())
},
OnMatch: func(event stream.MatchEvent) {
log.Printf("Stream matched: listener=%d, matcher=%d",
event.ListenerIndex, event.MatcherIndex)
},
OnUnmatched: func(conn net.Conn) {
log.Printf("Unmatched connection from %s", conn.RemoteAddr())
},
OnError: func(err error) {
log.Printf("Error: %v", err)
},
})
每个 Yamux 帧包含 12 字节的固定头部:
| 字段 | 大小 | 说明 |
|---|---|---|
| Version | 1 byte | 协议版本(当前为 0) |
| Type | 1 byte | 消息类型 |
| Flags | 2 bytes | 标志位 |
| StreamID | 4 bytes | 流标识符 |
| Length | 4 bytes | 负载长度 |
| 类型 | 值 | 说明 |
|---|---|---|
| Data | 0x0 | 数据传输 |
| WindowUpdate | 0x1 | 窗口更新(流控) |
| Ping | 0x2 | 心跳/RTT 测量 |
| GoAway | 0x3 | 会话终止 |
| 标志 | 值 | 说明 |
|---|---|---|
| SYN | 0x1 | 发起新流 |
| ACK | 0x2 | 确认流建立 |
| FIN | 0x4 | 半关闭 |
| RST | 0x8 | 强制重置 |
详细协议规范请参阅 spec.md。
config := yamux.DefaultConfig()
// 高并发场景增加积压队列
config.AcceptBacklog = 1024
// 调整窗口大小适应高带宽延迟积网络
config.MaxStreamWindowSize = 1024 * 1024 // 1MB
// 根据网络环境调整保活间隔
config.KeepAliveInterval = 15 * time.Second
// 生产环境建议设置合理的超时
config.StreamOpenTimeout = 30 * time.Second
config.StreamCloseTimeout = 2 * time.Minute
// 始终使用 defer 确保资源释放
session, err := yamux.Client(conn, config)
if err != nil {
return err
}
defer session.Close()
// 流使用完毕及时关闭
stream, err := session.Open()
if err != nil {
return err
}
defer stream.Close()
// 定期调用 Shrink 释放空闲内存
if s, ok := stream.(*yamux.Stream); ok {
s.Shrink()
}
// 区分可恢复和不可恢复错误
stream, err := session.AcceptStream()
if err != nil {
if err == yamux.ErrSessionShutdown {
// 会话正常关闭,优雅退出
return
}
if errors.Is(err, yamux.ErrRemoteGoAway) {
// 对端拒绝新连接
return
}
// 处理其他错误
}
// 监控会话状态
log.Printf("Active streams: %d", session.NumStreams())
log.Printf("Accept backlog: %d", session.AcceptBacklog())
// 使用生命周期钩子进行指标收集
session.SetHooks(yamux.SessionHooks{
OnFlowControlStall: func(event yamux.FlowControlStallEvent) {
metrics.IncCounter("yamux.flow_control_stall",
"stream_id", strconv.FormatUint(uint64(event.StreamID), 10),
)
},
})
// stream hooks 与 session hooks 可组合使用
mux.SetHooks(stream.Hooks{
OnMatch: func(event stream.MatchEvent) {
metrics.IncCounter("streams.matched",
"listener", strconv.Itoa(event.ListenerIndex),
"matcher", strconv.Itoa(event.MatcherIndex),
)
},
})
// 生产者-消费者模式
func producer(session *yamux.Session, workCh chan<- net.Conn) {
for {
stream, err := session.Accept()
if err != nil {
close(workCh)
return
}
workCh <- stream
}
}
func consumers(workCh <-chan net.Conn, n int) {
var wg sync.WaitGroup
for i := 0; i < n; i++ {
wg.Add(1)
go func() {
defer wg.Done()
for stream := range workCh {
handleStream(stream)
}
}()
}
wg.Wait()
}
/workspace ├── mux.go # 配置和会话创建入口 ├── session.go # 会话管理核心实现 ├── stream.go # 流实现(net.Conn 接口) ├── addr.go # 地址相关实现 ├── const.go # 协议常量、错误定义、帧格式 ├── util.go # 工具函数和 Logger 接口 ├── hooks.go # 生命周期钩子定义与触发逻辑 ├── stats.go # Session / Stream 运行时统计快照 ├── lifecycle.go # Drain、CloseContext 等生命周期控制 ├── spec.md # 协议规范文档 ├── websocket/ # WebSocket transport 子包 │ ├── doc.go # 包文档 │ ├── conn.go # WebSocket -> net.Conn 适配器 │ ├── errors.go # 子包错误定义 │ ├── README.md # 子包文档 │ └── DESIGN.md # 设计说明 ├── balancer/ # 多 session 负载均衡子包 │ ├── doc.go # 包文档 │ ├── pool.go # session pool 与 open stream 编排 │ ├── strategy.go # 内置负载均衡策略 │ ├── affinity.go # sticky affinity 策略与 binding 治理 │ ├── stream.go # active stream 跟踪包装器 │ ├── README.md # 子包文档 │ └── DESIGN.md # 设计说明 ├── stream/ # 协议多路分发子包 │ ├── doc.go # 包文档 │ ├── mux.go # 多路分发器实现 │ ├── matchers.go # 协议匹配器 │ ├── buffer.go # 缓冲 sniff 实现 │ ├── listener.go # 子 listener 实现 │ ├── stats.go # mux / listener / matcher 分层统计 │ ├── DESIGN.md # 设计说明 │ └── README.md # 子包文档 ├── examples/ # 独立示例工程 │ ├── README.md # 示例索引 │ ├── gorilla-nethttp # gorilla/websocket + net/http upgrade 示例 │ ├── coder-websocket # coder/websocket 官方 NetConn 桥接示例 │ ├── balancer-gateway # 零额外依赖的 balancer gateway 示例 │ └── gorilla-balancer-gateway # websocket tunnel + balancer 完整示例 ├── README.md # 本文件 ├── CHANGELOG.md # 变更日志 ├── LICENSE # MPL-2.0 许可证 ├── go.mod # Go 模块定义 └── testdata/ # 测试证书和数据
go test ./... 确保所有测试通过go vet ./... 检查静态分析问题README.md、CHANGELOG.md 及相关子包文档websocket、balancer 与 stream 子包应保持最小依赖,避免引入第三方库到 core 包session.Close() 后 Stats() / NumStreams() 返回的状态是否准确errors.Is 可识别本项目采用 Mozilla Public License 2.0 许可证。
Copyright IBM Corp. 2014, 2025 SPDX-License-Identifier: MPL-2.0