MQTT X is a high-performance multi-session MQTT client library designed for Go applications. With deep optimizations, it provides exceptional performance, clean APIs, and powerful features.
Before using MQTT X, ensure your environment meets these requirements:
go version
# Should display: go version go1.23 or higher
go get github.com/darkit/mqttx
package main
import (
"log"
"time"
"github.com/darkit/mqttx"
)
func main() {
// Create session manager
manager := mqttx.NewSessionManager()
defer manager.Close()
// Use Builder pattern to create session
opts, err := mqttx.QuickConnect("prod-device", "broker.example.com:1883").
Auth("username", "password").
KeepAlive(60).
AutoReconnect().
Build()
if err != nil {
log.Fatal(err)
}
// Add session (auto-connects)
if err := manager.AddSession(opts); err != nil {
log.Fatal(err)
}
// Wait for session to be ready
if err := manager.WaitForSession("prod-device", 30 * time.Second); err != nil {
log.Printf("Connection warnings: %v", err)
}
// Publish and subscribe messages
session, _ := manager.Session("prod-device")
// Subscribe to topic
handler := func(topic string, payload []byte) {
log.Printf("Received: %s = %s", topic, string(payload))
}
session.Subscribe("sensors/+/temperature", 1, handler)
// Publish message
session.Publish("sensors/room1/temperature", []byte("23.5"), 1, false)
select {} // Keep running
}
// Broadcast to all sessions (non-retained)
errs := manager.PublishToAll("alerts/system", []byte("maintenance starting"), 1)
// Broadcast a retained message to all sessions
errs = manager.PublishRetainedToAll("alerts/system", []byte("latest bulletin"), 1)
if len(errs) > 0 {
log.Printf("broadcast failed on %d sessions", len(errs))
}
MQTT X provides fluent APIs to simplify configuration:
// Quick connect
opts, err := mqttx.QuickConnect("session-name", "localhost:1883").Build()
// Secure connect
opts, err := mqttx.SecureConnect("secure-session", "ssl://broker:8883", "/path/to/ca.crt").
Auth("user", "pass").
KeepAlive(60).
Build()
// Complex configuration
opts, err := mqttx.NewSessionBuilder("production-session").
Brokers("tcp://broker1:1883", "tcp://broker2:1883").
ClientID("client-001").
Auth("admin", "secret").
TLS("/etc/ssl/ca.crt", "/etc/ssl/client.crt", "/etc/ssl/client.key", false).
Performance(16, 5000).
RedisStorage("localhost:6379").
Subscribe("sensors/+", 1, handler).
Build()
Automatic message forwarding between sessions:
// Create forwarder
config, err := mqttx.NewForwarderBuilder("sensor-forwarder").
Source("sensor-session", "sensors/+/temperature").
Target("storage-session").
QoS(1).
MapTopic("sensors/room1/temperature", "storage/room1/temp").
Build()
forwarder, err := mqttx.NewForwarder(config, manager)
forwarder.Start()
Unified error handling mechanism:
// Check error types
if mqttx.IsTemporary(err) {
// Temporary error, can retry
log.Printf("Temporary error: %v", err)
} else if mqttx.IsTimeout(err) {
// Timeout error
log.Printf("Timeout error: %v", err)
}
// Create custom error
err := mqttx.NewConnectionError("connection failed", originalErr).
WithSession("my-session").
WithContext("retry_count", 3)
MQTT X performance on standard hardware (Go 1.23+, Intel i7-12700H):
| Metric | Performance | Notes |
|---|---|---|
| Message Throughput | 100K+ messages/sec | Single session max throughput |
| Atomic Operations | 4M+ operations/sec | Metrics update performance |
| Concurrent Sessions | 1000+ sessions | Per-process capacity |
| Memory Efficiency | < 5 bytes/metric | Per-metric object overhead |
| Forwarder Performance | 500K+ lifecycles/sec | Message forwarding throughput |
| Message Latency | < 1ms | Publish to receive (local broker) |
// Global metrics
globalMetrics := manager.Metrics()
log.Printf("Total messages: %d, Errors: %d",
globalMetrics.TotalMessages, globalMetrics.ErrorCount)
// Session metrics
sessionMetrics := session.Metrics()
log.Printf("Sent: %d, Received: %d",
sessionMetrics.MessagesSent, sessionMetrics.MessagesReceived)
// Forwarder metrics
forwarderMetrics := forwarder.Metrics()
log.Printf("Forwarded: %d, Dropped: %d",
forwarderMetrics.MessagesSent, forwarderMetrics.MessagesDropped)
The session manager (Manager) is the central component that handles multiple MQTT sessions:
// Create a new manager
m := manager.NewSessionManager()
// Add a session
err := m.AddSession(&manager.Options{...})
// Get session status
status := m.AllSessionsStatus()
// Remove a session
err := m.RemoveSession("session-name")
// List all sessions
sessions := m.ListSessions()
The manager provides connection waiting mechanisms to ensure sessions are ready before operations:
// Wait for a specific session to connect
err := m.AddSession(opts)
if err != nil {
log.Fatal(err)
}
// Wait up to 30 seconds for session to be ready
if err := m.WaitForSession("prod-device", 30*time.Second); err != nil {
log.Fatal(err)
}
// Or wait for all sessions to be ready
if err := m.WaitForAllSessions(30*time.Second); err != nil {
log.Fatal(err)
}
Four flexible message handling patterns are available:
route := m.Handle("topic/#", func(msg *manager.Message) {
log.Printf("Received: %s", msg.PayloadString())
})
defer route.Stop()
route, err := m.HandleTo("session-name", "topic/#", func(msg *manager.Message) {
log.Printf("Received on session: %s", msg.PayloadString())
})
defer route.Stop()
messages, route := m.Listen("topic/#")
go func() {
for msg := range messages {
log.Printf("Received: %s", msg.PayloadString())
}
}()
defer route.Stop()
messages, route, err := m.ListenTo("session-name", "topic/#")
go func() {
for msg := range messages {
log.Printf("Received: %s", msg.PayloadString())
}
}()
defer route.Stop()
Multiple independent handlers can be registered on the same topic. Each handler is tracked by a unique subscription ID. Route.Stop() only unbinds its own subscriptions without affecting other consumers on the same topic:
// Use SubscribeWithID to get a subscription ID
session, _ := manager.Session("session-name")
id1, err := session.SubscribeWithID("sensors/+", func(topic string, payload []byte) {
log.Printf("Handler A: %s", string(payload))
}, 1)
// Register a second handler (same topic, does NOT replace the first)
id2, err := session.SubscribeWithID("sensors/+", func(topic string, payload []byte) {
log.Printf("Handler B: %s", string(payload))
}, 1)
// Remove a single handler by ID; broker unsubscribe only happens when the last handler is removed
session.UnsubscribeByID("sensors/+", id1)
// Handler B continues to work normally
Multiple Routes can safely subscribe to the same topic without interference:
route1 := m.Handle("sensors/#", handlerA)
route2 := m.Handle("sensors/#", handlerB)
route1.Stop() // Only stops handlerA; handlerB is unaffected
The message forwarder allows automatic message forwarding between different sessions and topics, with support for filtering, transformation, and metadata injection:
// Create forwarder manager
forwarderManager := mqttx.NewForwarderManager(manager)
// Configure forwarder
forwarderConfig := mqttx.ForwarderConfig{
Name: "temperature-forwarder",
SourceSessions: []string{"source-session1", "source-session2"},
SourceTopics: []string{"sensors/+/temperature"},
TargetSession: "target-session",
TopicMapping: map[string]string{
"sensors/living-room/temperature": "processed/temperature/living-room",
},
QoS: 1,
Metadata: map[string]interface{}{
"forwarded_by": "temperature-forwarder",
"timestamp": time.Now().Unix(),
},
Enabled: true,
}
// Add and start forwarder
forwarder, err := forwarderManager.AddForwarder(forwarderConfig)
if err != nil {
log.Fatal(err)
}
// Get forwarder metrics
metrics := forwarder.Metrics()
log.Printf("Messages forwarded: %d", metrics["messages_forwarded"])
// Stop all forwarders
forwarderManager.StopAll()
The forwarder supports the following features:
Monitor session lifecycle and state changes with detailed event information:
// Monitor connection status (returns an unsubscribe func; call it when no longer needed)
unsubReady := m.OnEvent("session_ready", func(event manager.Event) {
log.Printf("Session %s is ready for operations", event.Session)
})
defer unsubReady()
// Monitor state changes
unsubState := m.OnEvent("session_state_changed", func(event manager.Event) {
stateData := event.Data.(map[string]interface{})
log.Printf("Session %s state changed from %v to %v",
event.Session,
stateData["old_state"],
stateData["new_state"])
})
defer unsubState()
Available Events:
session_connecting - Session is attempting to connectsession_connected - Session has successfully connectedsession_ready - Session is ready for operationssession_disconnected - Session has disconnected (includes error info if any)session_reconnecting - Session is attempting to reconnectsession_added - New session has been added to the managersession_removed - Session has been removed from the managersession_state_changed - Session state has changedEvent Data Structure:
type Event struct {
Type string // Event type
Session string // Session name
Data interface{} // Additional event data
Timestamp time.Time // Event timestamp
}
Common Event Data Contents:
session_connected: Connection detailssession_disconnected: Error information (if any)session_state_changed: Map containing "old_state" and "new_state"session_reconnecting: Reconnection attempt countsession_ready: Session configuration summaryopts := &manager.Options{
Name: "secure-session",
Brokers: []string{"ssl://broker.example.com:8883"},
ClientID: "secure-client-001",
TLS: &manager.TLSConfig{
CAFile: "/path/to/ca.crt",
CertFile: "/path/to/client.crt",
KeyFile: "/path/to/client.key",
SkipVerify: false,
},
}
opts := &manager.Options{
Performance: &manager.PerformanceOptions{
WriteBufferSize: 4096,
ReadBufferSize: 4096,
MessageChanSize: 1000,
MaxMessageSize: 32 * 1024,
MaxPendingMessages: 5000,
WriteTimeout: time.Second * 30,
ReadTimeout: time.Second * 30,
},
}
opts := &manager.Options{
ConnectProps: &manager.ConnectProps{
PersistentSession: true,
ResumeSubs: true,
},
}
Monitor session and manager performance:
// Get manager-level metrics
metrics := m.Metrics()
// Get session-specific metrics
session, _ := m.Session("session-name")
sessionMetrics := session.Metrics()
// Get all forwarder metrics
forwarderMetrics := forwarderManager.AllMetrics()
Expose metrics in Prometheus format via HTTP endpoint:
// Create HTTP server to expose Prometheus metrics
go func() {
promExporter := manager.NewPrometheusExporter("mqtt", manager.WithLabels(map[string]string{
"env": "dev",
}))
http.HandleFunc("/metrics", func(w http.ResponseWriter, r *http.Request) {
var output strings.Builder
// Collect manager metrics
metrics := m.Metrics()
output.WriteString(promExporter.Export(metrics))
// Collect all session metrics
for _, name := range m.ListSessions() {
if session, err := m.Session(name); err == nil {
output.WriteString(promExporter.ExportWithSession(name, session.Metrics()))
}
}
w.Header().Set("Content-Type", "text/plain")
fmt.Fprint(w, output.String())
})
log.Printf("Starting metrics server on :2112")
http.ListenAndServe(":2112", nil)
}()
Add scrape target in Prometheus configuration:
scrape_configs:
- job_name: 'mqtt_metrics'
static_configs:
- targets: ['localhost:2112']
scrape_interval: 15s
Available Prometheus metrics include:
Message Metrics:
mqtt_session_messages_sent_total - Total messages sentmqtt_session_messages_received_total - Total messages receivedmqtt_session_bytes_sent_total - Total bytes sentmqtt_session_bytes_received_total - Total bytes receivedmqtt_session_message_rate - Current messages per secondmqtt_session_avg_message_rate - Average messages per second since startmqtt_session_bytes_rate - Bytes per secondStatus Metrics:
mqtt_session_connected - Session connection status (0/1)mqtt_session_status - Session status codemqtt_session_subscriptions - Active subscription countmqtt_session_errors_total - Total error countmqtt_session_reconnects_total - Reconnection attemptsTimestamp Metrics:
mqtt_session_last_message_timestamp_seconds - Unix timestamp of last messagemqtt_session_last_error_timestamp_seconds - Unix timestamp of last errorSession Properties:
mqtt_session_persistent - Persistent session flag (0/1)mqtt_session_clean_session - Clean session flag (0/1)mqtt_session_auto_reconnect - Auto reconnect flag (0/1)All metrics include a session="session-name" label for filtering and aggregation by session.
MQTT X provides intelligent error recovery with automatic classification, circuit breaker pattern, and event-driven retry notification:
Error Classification:
// Check error types
if mqttx.IsTemporary(err) {
// Temporary network error - safe to retry
log.Printf("Temporary error, will retry: %v", err)
} else if mqttx.IsTimeout(err) {
// Timeout error - operation exceeded time limit
log.Printf("Operation timeout: %v", err)
} else if mqttx.IsPermanent(err) {
// Permanent error - retry won't help
log.Printf("Permanent error, no retry: %v", err)
}
Circuit Breaker & Auto-Recovery:
The recovery manager uses a circuit breaker pattern per session+category. When operations succeed, errors are automatically marked as recovered:
// Errors are automatically registered on publish/subscribe failures
// On success, corresponding errors are auto-recovered
err := manager.PublishTo("session", "topic", payload, 1)
// If this succeeds, any prior publish errors for this session are marked recovered
// Listen for retry suggestions via events
manager.OnEvent("error_retry_suggested", func(event mqttx.Event) {
data := event.Data.(map[string]interface{})
log.Printf("Retry suggested for %s: %v (attempt %d/%d)",
event.Session, data["error"], data["retry_count"], data["max_retries"])
})
// Monitor circuit breaker state
stats := manager.ErrorStats()
log.Printf("Active errors: %d", stats["active"])
Error Recovery Configuration:
opts := mqttx.NewSessionBuilder("resilient").
Broker("localhost:1883").
MaxRetries(5). // Max retry attempts
RetryConfig(3, 500*time.Millisecond, true). // Exponential backoff
AutoReconnect().
Build()
MQTT X provides comprehensive support for MQTT 5.0 protocol features:
Enhanced Authentication:
// MQTT 5.0 enhanced authentication
opts := mqttx.NewSessionBuilder("mqtt5-auth").
Broker("mqtt5://broker:1883").
MQTT5EnhancedAuth("SCRAM-SHA-256", authHandler).
Build()
// Custom authentication handler
authHandler := func(authData []byte) ([]byte, error) {
// Implement custom authentication logic
return processAuth(authData)
}
User Properties:
// Add user properties to messages (MQTT 5.0+)
msg := mqttx.NewMessage("topic", []byte("payload")).
WithUserProperty("device_id", "sensor-001").
WithUserProperty("location", "room1").
WithUserProperty("timestamp", time.Now().Format(time.RFC3339))
session.PublishMessage(msg)
Request/Response Pattern:
// MQTT 5.0 request/response
opts := mqttx.NewSessionBuilder("request-response").
Broker("mqtt5://broker:1883").
RequestResponseTimeout(10 * time.Second).
Build()
// Send request and wait for response
response, err := session.Request("service/request", []byte("query"), 1)
if err == nil {
log.Printf("Response: %s", response.Payload)
}
Topic Aliases:
// Use topic aliases to reduce bandwidth (MQTT 5.0+)
opts := mqttx.NewSessionBuilder("topic-alias").
Broker("mqtt5://broker:1883").
TopicAliasMaximum(100). // Support up to 100 aliases
Build()
session, _ := manager.Session("topic-alias")
// First publish creates alias for long topic
session.Publish("very/long/topic/path/to/sensors/temperature", payload, 1, false)
// Subsequent publishes use alias, reducing bandwidth
session.Publish("very/long/topic/path/to/sensors/temperature", payload, 1, false)
Message Expiry Interval:
// Set message expiry (MQTT 5.0+)
msg := mqttx.NewMessage("sensor/data", []byte("temperature: 23.5")).
WithExpiryInterval(3600). // Expire in 1 hour
WithRetain(true)
session.PublishMessage(msg)
Session Expiry:
// Configure session expiry (MQTT 5.0+)
opts := mqttx.NewSessionBuilder("session-expiry").
Broker("mqtt5://broker:1883").
PersistentSession(true).
SessionExpiryInterval(7200). // 2-hour session expiry
Build()
Shared Subscriptions:
// MQTT 5.0 shared subscriptions for load balancing
opts := mqttx.NewSessionBuilder("consumer-1").
Broker("mqtt5://broker:1883").
Build()
session, _ := manager.Session("consumer-1")
// Subscribe to shared subscription
route := session.Subscribe("$share/sensor-group/sensors/+/temperature", 1,
func(topic string, payload []byte) {
// Multiple consumers share this subscription
log.Printf("Processing: %s = %s", topic, payload)
})
defer route.Stop()
For more details, see MQTT 5.0 examples.
Support for multiple storage backends:
// Memory storage (default, fastest)
opts := mqttx.NewSessionBuilder("memory-session").
Broker("localhost:1883").
Build()
// File storage
opts := mqttx.NewSessionBuilder("file-session").
Broker("localhost:1883").
FileStorage("/var/lib/mqttx").
Build()
// Redis storage
opts := mqttx.NewSessionBuilder("redis-session").
Broker("localhost:1883").
RedisStorage("localhost:6379").
RedisAuth("user", "pass", 1).
Build()
// High-performance configuration
opts := mqttx.NewSessionBuilder("high-perf").
Broker("localhost:1883").
Performance(32, 10000). // 32KB buffer, 10K pending messages
MessageChannelSize(2000). // 2K message channel
KeepAlive(300). // 5-minute keepalive
Timeouts(10, 5). // 10s connect, 5s write timeout
Build()
// Secure connection
opts := mqttx.SecureConnect("secure-session", "ssl://broker:8883", "/path/to/ca.crt").
Auth("username", "password").
TLS("/path/to/ca.crt", "/path/to/client.crt", "/path/to/client.key", false).
Build()
MQTT X provides comprehensive test coverage for reliability and concurrent safety:
| Test Type | Coverage | File(s) |
|---|---|---|
| Unit Tests | >90% | *_test.go files |
| Concurrent Safety | 100% | concurrent_test.go (Race Detector verified) |
| Session Operations | >85% | session_*_test.go files |
| Message Forwarding | >80% | forwarder_test.go |
| Error Handling | >85% | error_recovery_test.go |
| Example Code | 100% | examples_smoke_test.go |
# Run all tests
go test ./...
# Run with detailed output
go test -v ./...
# Run with coverage
go test -cover ./...
# Generate HTML coverage report
go test -coverprofile=coverage.out ./...
go tool cover -html=coverage.out
# Run race condition detection (IMPORTANT for concurrent code)
go test -race ./...
# Run specific test
go test -run TestSessionPublish -v ./...
# Run benchmarks
go test -bench=. -benchmem
# Run performance tests
go test -run TestPerformanceImprovement -v
All MQTT X APIs are verified with Go's Race Detector:
# Run tests with race detection enabled
go test -race ./...
# This verifies:
# ✅ No data races in concurrent operations
# ✅ Proper mutex usage
# ✅ Safe channel operations
# ✅ Atomic variable updates
Note: MQTT X passes all race condition tests. If you encounter race conditions, it indicates a bug that should be reported.
Problem: Session cannot connect to MQTT broker
Checklist:
Solution:
// Enable debug logging
manager.SetLogger(log.New(os.Stdout, "[MQTT] ", log.LstdFlags))
// Check connection status
session, _ := manager.Session("session-name")
log.Printf("Connected: %v", session.IsConnected())
log.Printf("Status: %s", session.Status())
// Use shorter timeout for faster failure detection
opts := mqttx.NewSessionBuilder("test").
Broker("localhost:1883").
Timeouts(5, 3). // 5s connect, 3s write
Build()
Problem: TLS connection error with certificate verification
Solution:
// Verify certificate paths
opts := mqttx.SecureConnect("secure", "ssl://broker:8883", "/path/to/ca.crt").
TLS("/path/to/ca.crt", "/path/to/client.crt", "/path/to/client.key", false).
Build()
// For development only (not recommended for production)
opts := mqttx.SecureConnect("dev", "ssl://broker:8883", "").
TLS("", "", "", true). // Skip verification
Build()
Problem: Published messages not received
Checklist:
+ and #)Solution:
// Use QoS 1 or 2 for guaranteed delivery
session.Publish("topic", []byte("message"), 1, false) // QoS 1
// Enable session persistence
opts := mqttx.NewSessionBuilder("persistent").
Broker("localhost:1883").
PersistentSession(true).
Build()
// Monitor metrics
metrics := session.Metrics()
log.Printf("Sent: %d, Received: %d, Errors: %d",
metrics.MessagesSent, metrics.MessagesReceived, metrics.ErrorCount)
Problem: Memory usage continuously increases
Common Causes:
defer route.Stop()Solution:
// Always clean up routes
route := manager.Handle("topic/#", handler)
defer route.Stop() // Critical!
// Properly close manager
defer manager.Close()
// Remove unused sessions
manager.RemoveSession("old-session")
// Monitor memory
runtime.GC()
var m runtime.MemStats
runtime.ReadMemStats(&m)
log.Printf("Alloc: %v, TotalAlloc: %v", m.Alloc, m.TotalAlloc)
Problem: -race flag detects race conditions
Solution: MQTT X has all APIs verified with Race Detector. If you encounter race conditions:
# Run tests with race detector
go test -race ./...
For High Throughput:
opts := mqttx.NewSessionBuilder("high-perf").
Broker("localhost:1883").
Performance(32, 10000). // 32KB buffer, 10K pending
MessageChannelSize(5000). // 5K message channel
KeepAlive(300). // 5-minute keepalive
Build()
For Low Latency:
opts := mqttx.NewSessionBuilder("low-latency").
Broker("localhost:1883").
Performance(4, 100). // Small buffer, few pending
MessageChannelSize(100). // Small channel
KeepAlive(30). // Frequent keepalive
Timeouts(5, 3). // Short timeouts
Build()
For Low Memory:
opts := mqttx.NewSessionBuilder("low-memory").
Broker("localhost:1883").
Performance(2, 50). // Minimal buffer
MessageChannelSize(50). // Minimal channel
MaxMessageSize(8 * 1024). // 8KB limit
Build()
If you can't solve the issue:
Resource Management
defer route.Stop() for subscription cleanupPerformance Optimization
HandleTo/ListenTo) when possibleReliability
Security
Forwarder Usage
MQTT X provides a comprehensive SKILL system to accelerate your development workflow:
mqttx - Comprehensive master SKILL using progressive disclosure
The unified entry point for all MQTT X skills. Perfect when you:
Key Resources:
Skills Overview - Complete guide to all available skills
Quick Start:
Security & Reliability:
Advanced Features:
Development & Testing:
Using Skills with Claude Code:
"Use mqttx skill to help me get started" "Use mqttx-quickstart skill to create a basic MQTT client" "Configure TLS security using mqttx-tls-security skill" "Show me the learning path for MQTT X"
Skills provide:
We welcome Issues and Pull Requests! Please ensure:
This project is licensed under the MIT License.
Thanks to the following projects for inspiration and support:
MQTT X - Making MQTT client development simpler and more efficient!