logo
0
0
WeChat Login
docs: 修正import路径

Redque

Redque 是一个基于 Redis Stream 实现的支持失败延迟重试功能的队列服务库。

特性

  • 基于 Redis Stream 实现的可靠消息队列
  • 支持消息失败自动重试
  • 支持延迟重试
  • 支持死信队列
  • 支持消息过期清理
  • 支持消费者组模式
  • 支持批量消息处理

安装

go get cnb.cool/libx/redque

快速开始

package main import ( "time" "cnb.cool/libx/redque" "github.com/redis/go-redis/v9" ) func main() { // 创建 Redis 客户端 rdb := redis.NewClient(&redis.Options{ Addr: "localhost:6379", }) // 创建队列实例 queue := redque.New("consumer_1", redque.Config{ Cli: rdb, Topic: "my_queue", MaxRetries: 3, RetryDelay: time.Second * 5, MaxRetention: time.Hour * 24, }) defer queue.Stop() // 启动队列 if err := queue.Start(); err != nil { panic(err) } // 生产消息 err := queue.Produce(&redque.Message{ MsgType: "user.created", Payload: []byte(`{"user_id": "123"}`), Handle: func(msg *redque.Message) error { // 处理消息 return nil }, }) if err != nil { panic(err) } }

配置选项

type Config struct { // Redis 客户端实例 Cli *redis.Client // 队列主题名称 Topic string // 消费者组名称(默认:default) ConsumerGroup string // 消费者名称 ConsumerName string // 最大重试次数(默认:3) MaxRetries int // 重试延迟时间(默认:5秒) RetryDelay time.Duration // 消息最大保留时间(默认:24小时) MaxRetention time.Duration // 批量获取消息数量(默认:10) BatchSize int // 轮询间隔(默认:1秒) PollInterval time.Duration // 清理间隔(默认:1小时) CleanupInterval time.Duration }

重试机制

当消息处理失败时(Handle 函数返回错误),队列会根据以下规则处理:

  1. 检查消息的剩余重试次数

    • 如果重试次数 > 0,创建新的重试消息
    • 如果重试次数 = 0,消息会被移到死信队列
  2. 重试消息会在指定的延迟时间后被重新处理

    • 可以通过 Message.RetryDelay 为每个消息指定不同的延迟时间
    • 如果未指定,使用队列配置中的 RetryDelay
  3. 死信队列中的消息不会被自动处理,需要手动处理

消息清理

队列会自动清理以下消息:

  1. 已经确认(ACK)的消息
  2. 超过最大保留时间(MaxRetention)的消息
  3. 死信队列中的过期消息

注意事项

  1. 确保 Redis 版本 >= 5.0,因为使用了 Stream 特性

  2. 消息处理函数(Handle)应该是幂等的,因为消息可能会被重试多次

  3. 建议设置合理的 MaxRetention 时间,以防止消息堆积

  4. 在生产环境中,建议配置 Redis 持久化,以防止消息丢失

  5. 如果需要处理大量消息,可以调整以下参数:

    • BatchSize:增加批量处理消息数量
    • PollInterval:减少轮询间隔
    • CleanupInterval:增加清理间隔

许可证

MIT License