MQTTX 是一个为 Go 应用程序设计的高性能多会话 MQTT 客户端库。经过深度优化,提供了卓越的性能、简洁的 API 和强大的功能。
go get github.com/darkit/mqttx
package main
import (
"log"
"time"
"github.com/darkit/mqttx"
)
func main() {
// 创建会话管理器
manager := mqttx.NewSessionManager()
defer manager.Close()
// 使用 Builder 模式创建会话
opts, err := mqttx.QuickConnect("生产设备", "broker.example.com:1883").
Auth("username", "password").
KeepAlive(60).
AutoReconnect().
Build()
if err != nil {
log.Fatal(err)
}
// 添加会话并连接
if err := manager.AddSession(opts); err != nil {
log.Fatal(err)
}
if err := manager.ConnectAll(); err != nil {
log.Fatal(err)
}
// 等待连接完成
if err := manager.WaitForAllSessions(30 * time.Second); err != nil {
log.Printf("连接警告: %v", err)
}
// 发布和订阅消息
session, _ := manager.Session("生产设备")
// 订阅主题
handler := func(topic string, payload []byte) {
log.Printf("收到消息: %s = %s", topic, string(payload))
}
session.Subscribe("sensors/+/temperature", 1, handler)
// 发布消息
session.Publish("sensors/room1/temperature", []byte("23.5"), 1, false)
select {} // 保持运行
}
// 向所有会话广播(非保留)
errs := manager.PublishToAll("alerts/system", []byte("系统维护开始"), 1)
// 向所有会话广播保留消息
errs = manager.PublishRetainedToAll("alerts/system", []byte("最新公告"), 1)
if len(errs) > 0 {
log.Printf("广播过程中出现 %d 个错误", len(errs))
}
MQTTX 提供了流畅的 API 来简化配置:
// 快速连接
opts, err := mqttx.QuickConnect("session-name", "localhost:1883").Build()
// 安全连接
opts, err := mqttx.SecureConnect("secure-session", "ssl://broker:8883", "/path/to/ca.crt").
Auth("user", "pass").
KeepAlive(60).
Build()
// 复杂配置
opts, err := mqttx.NewSessionBuilder("production-session").
Brokers("tcp://broker1:1883", "tcp://broker2:1883").
ClientID("client-001").
Auth("admin", "secret").
TLS("/etc/ssl/ca.crt", "/etc/ssl/client.crt", "/etc/ssl/client.key", false).
Performance(16, 5000).
RedisStorage("localhost:6379").
Subscribe("sensors/+", 1, handler).
Build()
自动在会话间转发消息:
// 创建转发器
config, err := mqttx.NewForwarderBuilder("sensor-forwarder").
Source("sensor-session", "sensors/+/temperature").
Target("storage-session").
QoS(1).
MapTopic("sensors/room1/temperature", "storage/room1/temp").
Build()
forwarder, err := mqttx.NewForwarder(config, manager)
forwarder.Start()
统一的错误处理机制:
// 检查错误类型
if mqttx.IsTemporary(err) {
// 临时错误,可重试
log.Printf("临时错误: %v", err)
} else if mqttx.IsTimeout(err) {
// 超时错误
log.Printf("超时错误: %v", err)
}
// 创建自定义错误
err := mqttx.NewConnectionError("连接失败", originalErr).
WithSession("my-session").
WithContext("retry_count", 3)
MQTTX 在标准硬件上的性能表现:
// 全局指标
globalMetrics := manager.Metrics()
log.Printf("总消息数: %d, 错误数: %d",
globalMetrics.TotalMessages, globalMetrics.ErrorCount)
// 会话指标
sessionMetrics := session.Metrics()
log.Printf("已发送: %d, 已接收: %d",
sessionMetrics.MessagesSent, sessionMetrics.MessagesReceived)
// 转发器指标
forwarderMetrics := forwarder.Metrics()
log.Printf("已转发: %d, 已丢弃: %d",
forwarderMetrics.MessagesSent, forwarderMetrics.MessagesDropped)
会话管理器(Manager)是处理多个 MQTT 会话的核心组件:
// 创建新的管理器
m := manager.NewSessionManager()
// 添加会话
err := m.AddSession(&manager.Options{...})
// 获取会话状态
status := m.AllSessionsStatus()
// 移除会话
err := m.RemoveSession("会话名称")
// 列出所有会话
sessions := m.ListSessions()
管理器提供连接等待机制,确保会话在操作前准备就绪:
// 等待特定会话连接
err := m.AddSession(opts)
if err != nil {
log.Fatal(err)
}
// 等待会话就绪,超时时间30秒
if err := m.WaitForSession("生产设备", 30*time.Second); err != nil {
log.Fatal(err)
}
// 或等待所有会话就绪
if err := m.WaitForAllSessions(30*time.Second); err != nil {
log.Fatal(err)
}
提供四种灵活的消息处理模式:
route := m.Handle("主题/#", func(msg *manager.Message) {
log.Printf("收到消息:%s", msg.PayloadString())
})
defer route.Stop()
route, err := m.HandleTo("会话名称", "主题/#", func(msg *manager.Message) {
log.Printf("会话收到消息:%s", msg.PayloadString())
})
defer route.Stop()
messages, route := m.Listen("主题/#")
go func() {
for msg := range messages {
log.Printf("收到消息:%s", msg.PayloadString())
}
}()
defer route.Stop()
messages, route, err := m.ListenTo("会话名称", "主题/#")
go func() {
for msg := range messages {
log.Printf("收到消息:%s", msg.PayloadString())
}
}()
defer route.Stop()
同一 topic 支持注册多个独立的 handler,每个 handler 通过唯一 ID 管理。Route.Stop() 仅解绑自身持有的订阅,不影响同 topic 的其他消费者:
// 使用 SubscribeWithID 获取订阅 ID
session, _ := m.Session("会话名称")
id, err := session.SubscribeWithID("sensors/+", func(topic string, payload []byte) {
log.Printf("Handler A: %s", string(payload))
}, 1)
// 注册第二个 handler(同一 topic,不会覆盖第一个)
id2, err := session.SubscribeWithID("sensors/+", func(topic string, payload []byte) {
log.Printf("Handler B: %s", string(payload))
}, 1)
// 按 ID 精确移除单个 handler,仅最后一个 handler 移除时才取消 broker 订阅
session.UnsubscribeByID("sensors/+", id)
// Handler B 仍然正常工作
多个 Route 可以安全地订阅同一 topic,互不干扰:
route1 := m.Handle("sensors/#", handlerA)
route2 := m.Handle("sensors/#", handlerB)
route1.Stop() // 仅停止 handlerA,handlerB 不受影响
消息转发器允许在不同会话和主题之间自动转发消息,支持过滤、转换和元数据注入:
// 创建转发器管理器
forwarderManager := mqttx.NewForwarderManager(manager)
// 配置转发器
forwarderConfig := mqttx.ForwarderConfig{
Name: "温度转发器",
SourceSessions: []string{"源会话1", "源会话2"},
SourceTopics: []string{"sensors/+/temperature"},
TargetSession: "目标会话",
TopicMapping: map[string]string{
"sensors/living-room/temperature": "processed/temperature/living-room",
},
QoS: 1,
Metadata: map[string]interface{}{
"forwarded_by": "温度转发器",
"timestamp": time.Now().Unix(),
},
Enabled: true,
}
// 添加并启动转发器
forwarder, err := forwarderManager.AddForwarder(forwarderConfig)
if err != nil {
log.Fatal(err)
}
// 获取转发器指标
metrics := forwarder.Metrics()
log.Printf("已转发消息: %d", metrics["messages_forwarded"])
// 停止所有转发器
forwarderManager.StopAll()
转发器支持以下功能:
监控会话生命周期和状态变化,提供详细的事件信息:
// 监控连接状态(返回取消订阅函数,可在不再需要时调用以释放资源)
unsubscribeReady := m.OnEvent("session_ready", func(event manager.Event) {
log.Printf("会话 %s 已准备就绪", event.Session)
})
defer unsubscribeReady()
// 监控状态变化
unsubscribeState := m.OnEvent("session_state_changed", func(event manager.Event) {
stateData := event.Data.(map[string]interface{})
log.Printf("会话 %s 状态从 %v 变更为 %v",
event.Session,
stateData["old_state"],
stateData["new_state"])
})
defer unsubscribeState()
可用事件:
session_connecting - 会话正在连接中session_connected - 会话已成功连接session_ready - 会话已准备就绪session_disconnected - 会话已断开连接(包含错误信息)session_reconnecting - 会话正在尝试重连session_added - 新会话已添加到管理器session_removed - 会话已从管理器中移除session_state_changed - 会话状态已发生变化事件数据结构:
type Event struct {
Type string // 事件类型
Session string // 会话名称
Data interface{} // 附加事件数据
Timestamp time.Time // 事件时间戳
}
常见事件数据内容:
session_connected:连接详情session_disconnected:错误信息(如果有)session_state_changed:包含 "old_state" 和 "new_state" 的映射session_reconnecting:重连尝试次数session_ready:会话配置摘要opts := &manager.Options{
Name: "安全会话",
Brokers: []string{"ssl://broker.example.com:8883"},
ClientID: "secure-client-001",
TLS: &manager.TLSConfig{
CAFile: "/path/to/ca.crt",
CertFile: "/path/to/client.crt",
KeyFile: "/path/to/client.key",
SkipVerify: false,
},
}
opts := &manager.Options{
Performance: &manager.PerformanceOptions{
WriteBufferSize: 4096,
ReadBufferSize: 4096,
MessageChanSize: 1000,
MaxMessageSize: 32 * 1024,
MaxPendingMessages: 5000,
WriteTimeout: time.Second * 30,
ReadTimeout: time.Second * 30,
},
}
opts := &manager.Options{
ConnectProps: &manager.ConnectProps{
PersistentSession: true,
ResumeSubs: true,
},
}
监控会话和管理器性能:
// 获取管理器级别的指标
metrics := m.Metrics()
// 获取特定会话的指标
session, _ := m.Session("会话名称")
sessionMetrics := session.Metrics()
// 获取所有转发器的指标
forwarderMetrics := forwarderManager.AllMetrics()
支持通过 HTTP 端点暴露 Prometheus 格式的指标:
// 创建 HTTP 服务暴露 Prometheus 指标
go func() {
promExporter := manager.NewPrometheusExporter("mqtt", manager.WithLabels(map[string]string{
"env": "dev",
}))
http.HandleFunc("/metrics", func(w http.ResponseWriter, r *http.Request) {
var output strings.Builder
// 收集管理器指标
metrics := m.Metrics()
output.WriteString(promExporter.Export(metrics))
// 收集所有会话指标
for _, name := range m.ListSessions() {
if session, err := m.Session(name); err == nil {
output.WriteString(promExporter.ExportWithSession(name, session.Metrics()))
}
}
w.Header().Set("Content-Type", "text/plain")
fmt.Fprint(w, output.String())
})
log.Printf("Starting metrics server on :2112")
http.ListenAndServe(":2112", nil)
}()
在 Prometheus 配置中添加抓取目标:
scrape_configs:
- job_name: 'mqtt_metrics'
static_configs:
- targets: ['localhost:2112']
scrape_interval: 15s
可用的 Prometheus 指标包括:
消息指标:
mqtt_session_messages_sent_total - 发送的消息总数mqtt_session_messages_received_total - 接收的消息总数mqtt_session_bytes_sent_total - 发送的字节总数mqtt_session_bytes_received_total - 接收的字节总数mqtt_session_message_rate - 当前每秒消息数mqtt_session_avg_message_rate - 启动以来的平均每秒消息数mqtt_session_bytes_rate - 每秒字节数状态指标:
mqtt_session_connected - 会话连接状态(0/1)mqtt_session_status - 会话状态码mqtt_session_subscriptions - 活跃订阅数量mqtt_session_errors_total - 错误总数mqtt_session_reconnects_total - 重连次数时间戳指标:
mqtt_session_last_message_timestamp_seconds - 最后消息的 Unix 时间戳mqtt_session_last_error_timestamp_seconds - 最后错误的 Unix 时间戳会话属性:
mqtt_session_persistent - 持久会话标志(0/1)mqtt_session_clean_session - 清理会话标志(0/1)mqtt_session_auto_reconnect - 自动重连标志(0/1)所有指标都包含 session="会话名称" 标签,便于按会话进行过滤和聚合。
资源管理
defer route.Stop() 清理订阅性能优化
HandleTo/ListenTo)可靠性
安全性
转发器使用
支持多种存储后端:
// 内存存储(默认,最快)
opts := mqttx.NewSessionBuilder("memory-session").
Broker("localhost:1883").
Build()
// 文件存储
opts := mqttx.NewSessionBuilder("file-session").
Broker("localhost:1883").
FileStorage("/var/lib/mqttx").
Build()
// Redis 存储
opts := mqttx.NewSessionBuilder("redis-session").
Broker("localhost:1883").
RedisStorage("localhost:6379").
RedisAuth("user", "pass", 1).
Build()
// 高性能配置
opts := mqttx.NewSessionBuilder("high-perf").
Broker("localhost:1883").
Performance(32, 10000). // 32KB缓冲区, 10K pending消息
MessageChannelSize(2000). // 2K消息通道
KeepAlive(300). // 5分钟保活
Timeouts(10, 5). // 10s连接, 5s写入超时
Build()
# 运行所有测试
go test ./...
# 运行基准测试
go test -bench=. -benchmem
# 运行并发安全测试
go test -race ./...
# 性能测试
go test -run TestPerformanceImprovement -v
欢迎提交 Issue 和 Pull Request!请确保:
本项目采用 MIT 许可证。
感谢以下项目的启发和支持:
MQTTX - 让 MQTT 客户端开发更简单、更高效!