Redque 是一个基于 Redis Stream 实现的支持失败延迟重试功能的队列服务库。
go get cnb.cool/libx/redque
package main
import (
"time"
"cnb.cool/libx/redque"
"github.com/redis/go-redis/v9"
)
func main() {
// 创建 Redis 客户端
rdb := redis.NewClient(&redis.Options{
Addr: "localhost:6379",
})
// 创建队列实例
queue := redque.New("consumer_1", redque.Config{
Cli: rdb,
Topic: "my_queue",
MaxRetries: 3,
RetryDelay: time.Second * 5,
MaxRetention: time.Hour * 24,
})
defer queue.Stop()
// 启动队列
if err := queue.Start(); err != nil {
panic(err)
}
// 生产消息
err := queue.Produce(&redque.Message{
MsgType: "user.created",
Payload: []byte(`{"user_id": "123"}`),
Handle: func(msg *redque.Message) error {
// 处理消息
return nil
},
})
if err != nil {
panic(err)
}
}
type Config struct {
// Redis 客户端实例
Cli *redis.Client
// 队列主题名称
Topic string
// 消费者组名称(默认:default)
ConsumerGroup string
// 消费者名称
ConsumerName string
// 最大重试次数(默认:3)
MaxRetries int
// 重试延迟时间(默认:5秒)
RetryDelay time.Duration
// 消息最大保留时间(默认:24小时)
MaxRetention time.Duration
// 批量获取消息数量(默认:10)
BatchSize int
// 轮询间隔(默认:1秒)
PollInterval time.Duration
// 清理间隔(默认:1小时)
CleanupInterval time.Duration
}
当消息处理失败时(Handle 函数返回错误),队列会根据以下规则处理:
检查消息的剩余重试次数
重试消息会在指定的延迟时间后被重新处理
死信队列中的消息不会被自动处理,需要手动处理
队列会自动清理以下消息:
确保 Redis 版本 >= 5.0,因为使用了 Stream 特性
消息处理函数(Handle)应该是幂等的,因为消息可能会被重试多次
建议设置合理的 MaxRetention 时间,以防止消息堆积
在生产环境中,建议配置 Redis 持久化,以防止消息丢失
如果需要处理大量消息,可以调整以下参数:
MIT License