logo
0
0
WeChat Login

Redque

Redque 是一个基于 Redis Stream 实现的支持失败延迟重试功能的队列服务库。

特性

  • 基于 Redis Stream 实现的可靠消息队列
  • 支持消息失败自动重试
  • 支持延迟重试
  • 支持死信队列
  • 支持消息过期清理
  • 支持消费者组模式
  • 支持批量消息处理

安装

go get cnb.cool/libx/redque

快速开始

package main

import (
    "time"
    "cnb.cool/libx/redque"
    "github.com/redis/go-redis/v9"
)

func main() {
    // 创建 Redis 客户端
    rdb := redis.NewClient(&redis.Options{
        Addr: "localhost:6379",
    })

    // 创建队列实例
    queue := redque.New("consumer_1", redque.Config{
        Cli:          rdb,
        Topic:        "my_queue",
        MaxRetries:   3,
        RetryDelay:   time.Second * 5,
        MaxRetention: time.Hour * 24,
    })
    defer queue.Stop()

    // 启动队列
    if err := queue.Start(); err != nil {
        panic(err)
    }

    // 生产消息
    err := queue.Produce(&redque.Message{
        MsgType: "user.created",
        Payload: []byte(`{"user_id": "123"}`),
        Handle: func(msg *redque.Message) error {
            // 处理消息
            return nil
        },
    })
    if err != nil {
        panic(err)
    }
}

配置选项

type Config struct {
    // Redis 客户端实例
    Cli *redis.Client

    // 队列主题名称
    Topic string

    // 消费者组名称(默认:default)
    ConsumerGroup string

    // 消费者名称
    ConsumerName string

    // 最大重试次数(默认:3)
    MaxRetries int

    // 重试延迟时间(默认:5秒)
    RetryDelay time.Duration

    // 消息最大保留时间(默认:24小时)
    MaxRetention time.Duration

    // 批量获取消息数量(默认:10)
    BatchSize int

    // 轮询间隔(默认:1秒)
    PollInterval time.Duration

    // 清理间隔(默认:1小时)
    CleanupInterval time.Duration
}

重试机制

当消息处理失败时(Handle 函数返回错误),队列会根据以下规则处理:

  1. 检查消息的剩余重试次数

    • 如果重试次数 > 0,创建新的重试消息
    • 如果重试次数 = 0,消息会被移到死信队列
  2. 重试消息会在指定的延迟时间后被重新处理

    • 可以通过 Message.RetryDelay 为每个消息指定不同的延迟时间
    • 如果未指定,使用队列配置中的 RetryDelay
  3. 死信队列中的消息不会被自动处理,需要手动处理

消息清理

队列会自动清理以下消息:

  1. 已经确认(ACK)的消息
  2. 超过最大保留时间(MaxRetention)的消息
  3. 死信队列中的过期消息

注意事项

  1. 确保 Redis 版本 >= 5.0,因为使用了 Stream 特性

  2. 消息处理函数(Handle)应该是幂等的,因为消息可能会被重试多次

  3. 建议设置合理的 MaxRetention 时间,以防止消息堆积

  4. 在生产环境中,建议配置 Redis 持久化,以防止消息丢失

  5. 如果需要处理大量消息,可以调整以下参数:

    • BatchSize:增加批量处理消息数量
    • PollInterval:减少轮询间隔
    • CleanupInterval:增加清理间隔

许可证

MIT License

About

使用 Redis Stream 实现的支持延时、重试、死信队列的消息队列。

Language
Go86.6%
Markdown12.5%
Dockerfile0.7%
gitignore0.2%