logo
Public
0
0
WeChat Login

MQTT X - High-Performance Multi-Session MQTT Client Library

CI PkgGoDev Go Report Card Go Version MIT License GitHub release

🚀 Introduction

MQTT X is a high-performance multi-session MQTT client library designed for Go applications. With deep optimizations, it provides exceptional performance, clean APIs, and powerful features.

✨ Key Features

🏗️ Architecture Optimizations

  • Builder Pattern: Fluent API design that simplifies configuration
  • Efficient Memory Management: Optimized for Go 1.23+ GC, automatic memory management without manual pooling
  • Atomic Operations: 4M+ atomic operations/sec ensuring concurrent safety
  • Unified Error Handling: Structured error types with enhanced error information quality
  • Race-Free Design: All APIs verified with Race Detector for concurrent safety

🎯 Functional Features

  • Multi-Session Management: Concurrent handling of multiple MQTT connections
  • Message Forwarding System: Cross-session and cross-topic message forwarding
  • Auto-Reconnection: Built-in exponential backoff reconnection strategy
  • TLS/SSL Support: Certificate-based secure communication
  • Session Persistence: Support for memory, file, and Redis storage
  • Real-time Monitoring: Detailed performance and health metrics

🔧 Technical Features

  • Thread-safe Design: All operations are concurrency-safe
  • Performance Monitoring: Built-in metrics collection and performance analysis
  • Flexible Configuration: Rich configuration options and tuning parameters
  • Error Recovery: Intelligent error detection and recovery mechanisms

📋 Prerequisites

Before using MQTT X, ensure your environment meets these requirements:

Required

  • Go 1.23+: MQTT X leverages Go 1.23+ features for optimal performance
  • MQTT Broker: Any MQTT 3.1.1 or 5.0 compatible broker (Mosquitto, EMQX, VerneMQ, etc.)

Optional

  • Redis: For distributed session persistence

Verify Go Version

go version # Should display: go version go1.23 or higher

Installation

go get github.com/darkit/mqttx

🚀 Quick Start

Basic Usage

package main import ( "log" "time" "github.com/darkit/mqttx" ) func main() { // Create session manager manager := mqttx.NewSessionManager() defer manager.Close() // Use Builder pattern to create session opts, err := mqttx.QuickConnect("prod-device", "broker.example.com:1883"). Auth("username", "password"). KeepAlive(60). AutoReconnect(). Build() if err != nil { log.Fatal(err) } // Add session (auto-connects) if err := manager.AddSession(opts); err != nil { log.Fatal(err) } // Wait for session to be ready if err := manager.WaitForSession("prod-device", 30 * time.Second); err != nil { log.Printf("Connection warnings: %v", err) } // Publish and subscribe messages session, _ := manager.Session("prod-device") // Subscribe to topic handler := func(topic string, payload []byte) { log.Printf("Received: %s = %s", topic, string(payload)) } session.Subscribe("sensors/+/temperature", 1, handler) // Publish message session.Publish("sensors/room1/temperature", []byte("23.5"), 1, false) select {} // Keep running }

Broadcast Publishing

// Broadcast to all sessions (non-retained) errs := manager.PublishToAll("alerts/system", []byte("maintenance starting"), 1) // Broadcast a retained message to all sessions errs = manager.PublishRetainedToAll("alerts/system", []byte("latest bulletin"), 1) if len(errs) > 0 { log.Printf("broadcast failed on %d sessions", len(errs)) }

📚 Core Concepts

Builder Pattern

MQTT X provides fluent APIs to simplify configuration:

// Quick connect opts, err := mqttx.QuickConnect("session-name", "localhost:1883").Build() // Secure connect opts, err := mqttx.SecureConnect("secure-session", "ssl://broker:8883", "/path/to/ca.crt"). Auth("user", "pass"). KeepAlive(60). Build() // Complex configuration opts, err := mqttx.NewSessionBuilder("production-session"). Brokers("tcp://broker1:1883", "tcp://broker2:1883"). ClientID("client-001"). Auth("admin", "secret"). TLS("/etc/ssl/ca.crt", "/etc/ssl/client.crt", "/etc/ssl/client.key", false). Performance(16, 5000). RedisStorage("localhost:6379"). Subscribe("sensors/+", 1, handler). Build()

Message Forwarding

Automatic message forwarding between sessions:

// Create forwarder config, err := mqttx.NewForwarderBuilder("sensor-forwarder"). Source("sensor-session", "sensors/+/temperature"). Target("storage-session"). QoS(1). MapTopic("sensors/room1/temperature", "storage/room1/temp"). Build() forwarder, err := mqttx.NewForwarder(config, manager) forwarder.Start()

Error Handling

Unified error handling mechanism:

// Check error types if mqttx.IsTemporary(err) { // Temporary error, can retry log.Printf("Temporary error: %v", err) } else if mqttx.IsTimeout(err) { // Timeout error log.Printf("Timeout error: %v", err) } // Create custom error err := mqttx.NewConnectionError("connection failed", originalErr). WithSession("my-session"). WithContext("retry_count", 3)

📊 Performance Metrics

MQTT X performance on standard hardware (Go 1.23+, Intel i7-12700H):

MetricPerformanceNotes
Message Throughput100K+ messages/secSingle session max throughput
Atomic Operations4M+ operations/secMetrics update performance
Concurrent Sessions1000+ sessionsPer-process capacity
Memory Efficiency< 5 bytes/metricPer-metric object overhead
Forwarder Performance500K+ lifecycles/secMessage forwarding throughput
Message Latency< 1msPublish to receive (local broker)

Architecture Improvements (Phase 2)

  • Memory Management Optimized: Removed manual object pooling, Go 1.23+ GC provides optimal performance
  • Concurrent Safety Enhanced: All operations verified with Race Detector
  • Error Recovery Improved: Intelligent error classification and automatic recovery
  • Code Simplified: Cleaner architecture without object pool complexity

Performance Monitoring

// Global metrics globalMetrics := manager.Metrics() log.Printf("Total messages: %d, Errors: %d", globalMetrics.TotalMessages, globalMetrics.ErrorCount) // Session metrics sessionMetrics := session.Metrics() log.Printf("Sent: %d, Received: %d", sessionMetrics.MessagesSent, sessionMetrics.MessagesReceived) // Forwarder metrics forwarderMetrics := forwarder.Metrics() log.Printf("Forwarded: %d, Dropped: %d", forwarderMetrics.MessagesSent, forwarderMetrics.MessagesDropped)

Core Components

Session Manager

The session manager (Manager) is the central component that handles multiple MQTT sessions:

// Create a new manager m := manager.NewSessionManager() // Add a session err := m.AddSession(&manager.Options{...}) // Get session status status := m.AllSessionsStatus() // Remove a session err := m.RemoveSession("session-name") // List all sessions sessions := m.ListSessions()

Connection Management

The manager provides connection waiting mechanisms to ensure sessions are ready before operations:

// Wait for a specific session to connect err := m.AddSession(opts) if err != nil { log.Fatal(err) } // Wait up to 30 seconds for session to be ready if err := m.WaitForSession("prod-device", 30*time.Second); err != nil { log.Fatal(err) } // Or wait for all sessions to be ready if err := m.WaitForAllSessions(30*time.Second); err != nil { log.Fatal(err) }

Message Handling

Four flexible message handling patterns are available:

  1. Handle - Global callback-based handling:
route := m.Handle("topic/#", func(msg *manager.Message) { log.Printf("Received: %s", msg.PayloadString()) }) defer route.Stop()
  1. HandleTo - Session-specific callback handling:
route, err := m.HandleTo("session-name", "topic/#", func(msg *manager.Message) { log.Printf("Received on session: %s", msg.PayloadString()) }) defer route.Stop()
  1. Listen - Channel-based message reception:
messages, route := m.Listen("topic/#") go func() { for msg := range messages { log.Printf("Received: %s", msg.PayloadString()) } }() defer route.Stop()
  1. ListenTo - Session-specific channel reception:
messages, route, err := m.ListenTo("session-name", "topic/#") go func() { for msg := range messages { log.Printf("Received: %s", msg.PayloadString()) } }() defer route.Stop()

Multi-Handler Subscription Model

Multiple independent handlers can be registered on the same topic. Each handler is tracked by a unique subscription ID. Route.Stop() only unbinds its own subscriptions without affecting other consumers on the same topic:

// Use SubscribeWithID to get a subscription ID session, _ := manager.Session("session-name") id1, err := session.SubscribeWithID("sensors/+", func(topic string, payload []byte) { log.Printf("Handler A: %s", string(payload)) }, 1) // Register a second handler (same topic, does NOT replace the first) id2, err := session.SubscribeWithID("sensors/+", func(topic string, payload []byte) { log.Printf("Handler B: %s", string(payload)) }, 1) // Remove a single handler by ID; broker unsubscribe only happens when the last handler is removed session.UnsubscribeByID("sensors/+", id1) // Handler B continues to work normally

Multiple Routes can safely subscribe to the same topic without interference:

route1 := m.Handle("sensors/#", handlerA) route2 := m.Handle("sensors/#", handlerB) route1.Stop() // Only stops handlerA; handlerB is unaffected

Message Forwarder

The message forwarder allows automatic message forwarding between different sessions and topics, with support for filtering, transformation, and metadata injection:

// Create forwarder manager forwarderManager := mqttx.NewForwarderManager(manager) // Configure forwarder forwarderConfig := mqttx.ForwarderConfig{ Name: "temperature-forwarder", SourceSessions: []string{"source-session1", "source-session2"}, SourceTopics: []string{"sensors/+/temperature"}, TargetSession: "target-session", TopicMapping: map[string]string{ "sensors/living-room/temperature": "processed/temperature/living-room", }, QoS: 1, Metadata: map[string]interface{}{ "forwarded_by": "temperature-forwarder", "timestamp": time.Now().Unix(), }, Enabled: true, } // Add and start forwarder forwarder, err := forwarderManager.AddForwarder(forwarderConfig) if err != nil { log.Fatal(err) } // Get forwarder metrics metrics := forwarder.Metrics() log.Printf("Messages forwarded: %d", metrics["messages_forwarded"]) // Stop all forwarders forwarderManager.StopAll()

The forwarder supports the following features:

  1. Multi-source Forwarding - Subscribe to messages from multiple sessions
  2. Topic Mapping - Map source topics to different target topics
  3. Message Filtering - Filter messages based on topic or content
  4. Message Transformation - Transform message content before forwarding
  5. Metadata Injection - Add metadata to forwarded messages
  6. Performance Metrics - Provide detailed forwarding statistics

Event System

Monitor session lifecycle and state changes with detailed event information:

// Monitor connection status (returns an unsubscribe func; call it when no longer needed) unsubReady := m.OnEvent("session_ready", func(event manager.Event) { log.Printf("Session %s is ready for operations", event.Session) }) defer unsubReady() // Monitor state changes unsubState := m.OnEvent("session_state_changed", func(event manager.Event) { stateData := event.Data.(map[string]interface{}) log.Printf("Session %s state changed from %v to %v", event.Session, stateData["old_state"], stateData["new_state"]) }) defer unsubState()

Available Events:

  • session_connecting - Session is attempting to connect
  • session_connected - Session has successfully connected
  • session_ready - Session is ready for operations
  • session_disconnected - Session has disconnected (includes error info if any)
  • session_reconnecting - Session is attempting to reconnect
  • session_added - New session has been added to the manager
  • session_removed - Session has been removed from the manager
  • session_state_changed - Session state has changed

Event Data Structure:

type Event struct { Type string // Event type Session string // Session name Data interface{} // Additional event data Timestamp time.Time // Event timestamp }

Common Event Data Contents:

  • session_connected: Connection details
  • session_disconnected: Error information (if any)
  • session_state_changed: Map containing "old_state" and "new_state"
  • session_reconnecting: Reconnection attempt count
  • session_ready: Session configuration summary

Advanced Configuration

TLS Security

opts := &manager.Options{ Name: "secure-session", Brokers: []string{"ssl://broker.example.com:8883"}, ClientID: "secure-client-001", TLS: &manager.TLSConfig{ CAFile: "/path/to/ca.crt", CertFile: "/path/to/client.crt", KeyFile: "/path/to/client.key", SkipVerify: false, }, }

Performance Tuning

opts := &manager.Options{ Performance: &manager.PerformanceOptions{ WriteBufferSize: 4096, ReadBufferSize: 4096, MessageChanSize: 1000, MaxMessageSize: 32 * 1024, MaxPendingMessages: 5000, WriteTimeout: time.Second * 30, ReadTimeout: time.Second * 30, }, }

Session Persistence

opts := &manager.Options{ ConnectProps: &manager.ConnectProps{ PersistentSession: true, ResumeSubs: true, }, }

Metrics Collection

Monitor session and manager performance:

// Get manager-level metrics metrics := m.Metrics() // Get session-specific metrics session, _ := m.Session("session-name") sessionMetrics := session.Metrics() // Get all forwarder metrics forwarderMetrics := forwarderManager.AllMetrics()
Prometheus Integration

Expose metrics in Prometheus format via HTTP endpoint:

// Create HTTP server to expose Prometheus metrics go func() { promExporter := manager.NewPrometheusExporter("mqtt", manager.WithLabels(map[string]string{ "env": "dev", })) http.HandleFunc("/metrics", func(w http.ResponseWriter, r *http.Request) { var output strings.Builder // Collect manager metrics metrics := m.Metrics() output.WriteString(promExporter.Export(metrics)) // Collect all session metrics for _, name := range m.ListSessions() { if session, err := m.Session(name); err == nil { output.WriteString(promExporter.ExportWithSession(name, session.Metrics())) } } w.Header().Set("Content-Type", "text/plain") fmt.Fprint(w, output.String()) }) log.Printf("Starting metrics server on :2112") http.ListenAndServe(":2112", nil) }()

Add scrape target in Prometheus configuration:

scrape_configs: - job_name: 'mqtt_metrics' static_configs: - targets: ['localhost:2112'] scrape_interval: 15s

Available Prometheus metrics include:

Message Metrics:

  • mqtt_session_messages_sent_total - Total messages sent
  • mqtt_session_messages_received_total - Total messages received
  • mqtt_session_bytes_sent_total - Total bytes sent
  • mqtt_session_bytes_received_total - Total bytes received
  • mqtt_session_message_rate - Current messages per second
  • mqtt_session_avg_message_rate - Average messages per second since start
  • mqtt_session_bytes_rate - Bytes per second

Status Metrics:

  • mqtt_session_connected - Session connection status (0/1)
  • mqtt_session_status - Session status code
  • mqtt_session_subscriptions - Active subscription count
  • mqtt_session_errors_total - Total error count
  • mqtt_session_reconnects_total - Reconnection attempts

Timestamp Metrics:

  • mqtt_session_last_message_timestamp_seconds - Unix timestamp of last message
  • mqtt_session_last_error_timestamp_seconds - Unix timestamp of last error

Session Properties:

  • mqtt_session_persistent - Persistent session flag (0/1)
  • mqtt_session_clean_session - Clean session flag (0/1)
  • mqtt_session_auto_reconnect - Auto reconnect flag (0/1)

All metrics include a session="session-name" label for filtering and aggregation by session.

🔧 Advanced Features

Error Recovery Mechanism

MQTT X provides intelligent error recovery with automatic classification, circuit breaker pattern, and event-driven retry notification:

Error Classification:

// Check error types if mqttx.IsTemporary(err) { // Temporary network error - safe to retry log.Printf("Temporary error, will retry: %v", err) } else if mqttx.IsTimeout(err) { // Timeout error - operation exceeded time limit log.Printf("Operation timeout: %v", err) } else if mqttx.IsPermanent(err) { // Permanent error - retry won't help log.Printf("Permanent error, no retry: %v", err) }

Circuit Breaker & Auto-Recovery:

The recovery manager uses a circuit breaker pattern per session+category. When operations succeed, errors are automatically marked as recovered:

// Errors are automatically registered on publish/subscribe failures // On success, corresponding errors are auto-recovered err := manager.PublishTo("session", "topic", payload, 1) // If this succeeds, any prior publish errors for this session are marked recovered // Listen for retry suggestions via events manager.OnEvent("error_retry_suggested", func(event mqttx.Event) { data := event.Data.(map[string]interface{}) log.Printf("Retry suggested for %s: %v (attempt %d/%d)", event.Session, data["error"], data["retry_count"], data["max_retries"]) }) // Monitor circuit breaker state stats := manager.ErrorStats() log.Printf("Active errors: %d", stats["active"])

Error Recovery Configuration:

opts := mqttx.NewSessionBuilder("resilient"). Broker("localhost:1883"). MaxRetries(5). // Max retry attempts RetryConfig(3, 500*time.Millisecond, true). // Exponential backoff AutoReconnect(). Build()

MQTT 5.0 Support

MQTT X provides comprehensive support for MQTT 5.0 protocol features:

Enhanced Authentication:

// MQTT 5.0 enhanced authentication opts := mqttx.NewSessionBuilder("mqtt5-auth"). Broker("mqtt5://broker:1883"). MQTT5EnhancedAuth("SCRAM-SHA-256", authHandler). Build() // Custom authentication handler authHandler := func(authData []byte) ([]byte, error) { // Implement custom authentication logic return processAuth(authData) }

User Properties:

// Add user properties to messages (MQTT 5.0+) msg := mqttx.NewMessage("topic", []byte("payload")). WithUserProperty("device_id", "sensor-001"). WithUserProperty("location", "room1"). WithUserProperty("timestamp", time.Now().Format(time.RFC3339)) session.PublishMessage(msg)

Request/Response Pattern:

// MQTT 5.0 request/response opts := mqttx.NewSessionBuilder("request-response"). Broker("mqtt5://broker:1883"). RequestResponseTimeout(10 * time.Second). Build() // Send request and wait for response response, err := session.Request("service/request", []byte("query"), 1) if err == nil { log.Printf("Response: %s", response.Payload) }

Topic Aliases:

// Use topic aliases to reduce bandwidth (MQTT 5.0+) opts := mqttx.NewSessionBuilder("topic-alias"). Broker("mqtt5://broker:1883"). TopicAliasMaximum(100). // Support up to 100 aliases Build() session, _ := manager.Session("topic-alias") // First publish creates alias for long topic session.Publish("very/long/topic/path/to/sensors/temperature", payload, 1, false) // Subsequent publishes use alias, reducing bandwidth session.Publish("very/long/topic/path/to/sensors/temperature", payload, 1, false)

Message Expiry Interval:

// Set message expiry (MQTT 5.0+) msg := mqttx.NewMessage("sensor/data", []byte("temperature: 23.5")). WithExpiryInterval(3600). // Expire in 1 hour WithRetain(true) session.PublishMessage(msg)

Session Expiry:

// Configure session expiry (MQTT 5.0+) opts := mqttx.NewSessionBuilder("session-expiry"). Broker("mqtt5://broker:1883"). PersistentSession(true). SessionExpiryInterval(7200). // 2-hour session expiry Build()

Shared Subscriptions:

// MQTT 5.0 shared subscriptions for load balancing opts := mqttx.NewSessionBuilder("consumer-1"). Broker("mqtt5://broker:1883"). Build() session, _ := manager.Session("consumer-1") // Subscribe to shared subscription route := session.Subscribe("$share/sensor-group/sensors/+/temperature", 1, func(topic string, payload []byte) { // Multiple consumers share this subscription log.Printf("Processing: %s = %s", topic, payload) }) defer route.Stop()

For more details, see MQTT 5.0 examples.

Session Persistence

Support for multiple storage backends:

// Memory storage (default, fastest) opts := mqttx.NewSessionBuilder("memory-session"). Broker("localhost:1883"). Build() // File storage opts := mqttx.NewSessionBuilder("file-session"). Broker("localhost:1883"). FileStorage("/var/lib/mqttx"). Build() // Redis storage opts := mqttx.NewSessionBuilder("redis-session"). Broker("localhost:1883"). RedisStorage("localhost:6379"). RedisAuth("user", "pass", 1). Build()

Performance Tuning

// High-performance configuration opts := mqttx.NewSessionBuilder("high-perf"). Broker("localhost:1883"). Performance(32, 10000). // 32KB buffer, 10K pending messages MessageChannelSize(2000). // 2K message channel KeepAlive(300). // 5-minute keepalive Timeouts(10, 5). // 10s connect, 5s write timeout Build()

TLS Security

// Secure connection opts := mqttx.SecureConnect("secure-session", "ssl://broker:8883", "/path/to/ca.crt"). Auth("username", "password"). TLS("/path/to/ca.crt", "/path/to/client.crt", "/path/to/client.key", false). Build()

🧪 Testing

Test Coverage

MQTT X provides comprehensive test coverage for reliability and concurrent safety:

Test TypeCoverageFile(s)
Unit Tests>90%*_test.go files
Concurrent Safety100%concurrent_test.go (Race Detector verified)
Session Operations>85%session_*_test.go files
Message Forwarding>80%forwarder_test.go
Error Handling>85%error_recovery_test.go
Example Code100%examples_smoke_test.go

Running Tests

# Run all tests go test ./... # Run with detailed output go test -v ./... # Run with coverage go test -cover ./... # Generate HTML coverage report go test -coverprofile=coverage.out ./... go tool cover -html=coverage.out # Run race condition detection (IMPORTANT for concurrent code) go test -race ./... # Run specific test go test -run TestSessionPublish -v ./... # Run benchmarks go test -bench=. -benchmem # Run performance tests go test -run TestPerformanceImprovement -v

Concurrent Safety Verification

All MQTT X APIs are verified with Go's Race Detector:

# Run tests with race detection enabled go test -race ./... # This verifies: # ✅ No data races in concurrent operations # ✅ Proper mutex usage # ✅ Safe channel operations # ✅ Atomic variable updates

Note: MQTT X passes all race condition tests. If you encounter race conditions, it indicates a bug that should be reported.

🔧 Troubleshooting

Connection Issues

Problem: Session cannot connect to MQTT broker

Checklist:

  • ✅ Verify broker address and port
  • ✅ Check network connectivity
  • ✅ Verify firewall settings
  • ✅ Check authentication credentials

Solution:

// Enable debug logging manager.SetLogger(log.New(os.Stdout, "[MQTT] ", log.LstdFlags)) // Check connection status session, _ := manager.Session("session-name") log.Printf("Connected: %v", session.IsConnected()) log.Printf("Status: %s", session.Status()) // Use shorter timeout for faster failure detection opts := mqttx.NewSessionBuilder("test"). Broker("localhost:1883"). Timeouts(5, 3). // 5s connect, 3s write Build()

Certificate Verification Failed

Problem: TLS connection error with certificate verification

Solution:

// Verify certificate paths opts := mqttx.SecureConnect("secure", "ssl://broker:8883", "/path/to/ca.crt"). TLS("/path/to/ca.crt", "/path/to/client.crt", "/path/to/client.key", false). Build() // For development only (not recommended for production) opts := mqttx.SecureConnect("dev", "ssl://broker:8883", ""). TLS("", "", "", true). // Skip verification Build()

Message Loss

Problem: Published messages not received

Checklist:

  • ✅ Verify subscription topic pattern (watch wildcards + and #)
  • ✅ Check QoS level setting
  • ✅ Enable session persistence if needed
  • ✅ Verify network stability

Solution:

// Use QoS 1 or 2 for guaranteed delivery session.Publish("topic", []byte("message"), 1, false) // QoS 1 // Enable session persistence opts := mqttx.NewSessionBuilder("persistent"). Broker("localhost:1883"). PersistentSession(true). Build() // Monitor metrics metrics := session.Metrics() log.Printf("Sent: %d, Received: %d, Errors: %d", metrics.MessagesSent, metrics.MessagesReceived, metrics.ErrorCount)

Memory Leak

Problem: Memory usage continuously increases

Common Causes:

  • Forgetting to stop routes: defer route.Stop()
  • Event handlers not cleaned up
  • Sessions not properly closed

Solution:

// Always clean up routes route := manager.Handle("topic/#", handler) defer route.Stop() // Critical! // Properly close manager defer manager.Close() // Remove unused sessions manager.RemoveSession("old-session") // Monitor memory runtime.GC() var m runtime.MemStats runtime.ReadMemStats(&m) log.Printf("Alloc: %v, TotalAlloc: %v", m.Alloc, m.TotalAlloc)

Race Conditions

Problem: -race flag detects race conditions

Solution: MQTT X has all APIs verified with Race Detector. If you encounter race conditions:

  1. Update to the latest version
  2. Enable debug logging to identify the issue
  3. Report with reproduction steps: Issues
# Run tests with race detector go test -race ./...

Performance Tuning

For High Throughput:

opts := mqttx.NewSessionBuilder("high-perf"). Broker("localhost:1883"). Performance(32, 10000). // 32KB buffer, 10K pending MessageChannelSize(5000). // 5K message channel KeepAlive(300). // 5-minute keepalive Build()

For Low Latency:

opts := mqttx.NewSessionBuilder("low-latency"). Broker("localhost:1883"). Performance(4, 100). // Small buffer, few pending MessageChannelSize(100). // Small channel KeepAlive(30). // Frequent keepalive Timeouts(5, 3). // Short timeouts Build()

For Low Memory:

opts := mqttx.NewSessionBuilder("low-memory"). Broker("localhost:1883"). Performance(2, 50). // Minimal buffer MessageChannelSize(50). // Minimal channel MaxMessageSize(8 * 1024). // 8KB limit Build()

Getting Help

If you can't solve the issue:

  1. Check Issues for similar problems
  2. Read API Guide
  3. Review examples
  4. Submit new Issue with:
    • Go version
    • OS and architecture
    • Reproduction steps
    • Error logs and stack traces

Best Practices

  1. Resource Management

    • Always use defer route.Stop() for subscription cleanup
    • Implement proper error handling
    • Use meaningful session names and client IDs
  2. Performance Optimization

    • Configure appropriate buffer sizes for your use case
    • Use session-specific subscriptions (HandleTo/ListenTo) when possible
    • Monitor metrics to identify bottlenecks
    • Compare current and average message rates to identify traffic patterns
    • Use metrics data for capacity planning and performance tuning
  3. Reliability

    • Enable automatic reconnection for production use
    • Implement proper error handling and retry mechanisms
    • Use QoS levels appropriate for your use case
  4. Security

    • Enable TLS in production environments
    • Use strong client authentication
    • Regularly rotate credentials
  5. Forwarder Usage

    • Set appropriate buffer sizes for forwarders to avoid message loss
    • Use filters to reduce unnecessary message forwarding
    • Monitor forwarder metrics to detect issues early
    • Design appropriate topic mapping strategies for complex scenarios

📖 Documentation

Core Documentation

🎯 Claude Code Skills

MQTT X provides a comprehensive SKILL system to accelerate your development workflow:

🎯 Main Entry Point

mqttx - Comprehensive master SKILL using progressive disclosure

The unified entry point for all MQTT X skills. Perfect when you:

  • 🆕 Are new to MQTT X and unsure where to start
  • 🔍 Need help choosing the right skill for your use case
  • 📚 Want to follow a structured learning path
  • 🛠️ Need quick diagnostics and environment validation

Key Resources:

📚 Specialized Skills (10)

Skills Overview - Complete guide to all available skills

Quick Start:

Security & Reliability:

Advanced Features:

Development & Testing:

Using Skills with Claude Code:

"Use mqttx skill to help me get started" "Use mqttx-quickstart skill to create a basic MQTT client" "Configure TLS security using mqttx-tls-security skill" "Show me the learning path for MQTT X"

Skills provide:

  • ✅ Intelligent navigation with decision trees
  • ✅ Step-by-step guided workflows
  • ✅ Production-ready code templates
  • ✅ Best practices and troubleshooting guides
  • ✅ Complete working examples
  • ✅ Automated environment validation

🤝 Contributing

We welcome Issues and Pull Requests! Please ensure:

  1. Code passes all tests
  2. Follow existing code style
  3. Add necessary test cases
  4. Update relevant documentation

📄 License

This project is licensed under the MIT License.

🏆 Acknowledgments

Thanks to the following projects for inspiration and support:


MQTT X - Making MQTT client development simpler and more efficient!

About

No description, topics, or website provided.
31.25 MiB
0 forks0 stars2 branches0 TagREADMEMIT license
Language
Go99.3%
Makefile0.7%