logo
Public
0
0
WeChat Login
feat: 增强 API 和 Dashboard 安全功能

Cron 计划任务调度库

GoDoc Go Report Card CI MIT License

一个轻量级、安全可靠的 Go 语言计划任务调度库。

特性

  • 简洁的 API 设计 - 提供 Schedule、SafeSchedule 等直观易用的方法
  • 完善的安全保障 - 内置 panic 捕获和异常恢复机制,确保程序稳定运行
  • 并发安全保证 - 通过 -race 检测器验证,无数据竞争,支持高并发场景
  • 灵活的日志系统 - 支持自定义 Logger 接口,默认使用 log/slog 结构化日志
  • 上下文支持 - 所有任务函数支持 context.Context 进行取消和超时控制
  • 基础监控功能 - 提供任务执行统计和状态查询,包括执行时长、goroutine 监控
  • 多种调度方式 - 支持函数调度和 Job 接口调度
  • 运行时管理 - 支持动态添加、删除任务
  • 基础配置选项 - 支持异步执行、超时控制、并发限制
  • 智能重试机制 - 支持固定次数重试、无限重试、立即重试和自定义重试间隔
  • 历史记录功能 - 基于标准库的 KV 存储,支持任务执行历史查询、统计和清理
  • Web Dashboard - 独立子模块(可选,需单独 go get github.com/darkit/cron/dashboard),不影响主库依赖
  • Cron 表达式支持 - 兼容标准 5 段和 6 段 cron 表达式,支持@hourly@daily@every 等描述符,以及 L/W/# 高级语法
  • 时区表达式 - 直接支持 TZ= / CRON_TZ= 前缀,轻松在不同时区调度任务
  • 自动注册功能 - 支持 Job 的自动注册和批量调度

安装

go get github.com/darkit/cron

快速开始

运行时控制与事件(新增能力速览)

  • RunNow(id) 立即触发一次
  • Update(id, spec) / Pause(id) / Resume(id) 运行时修改、暂停、恢复
  • StopGracefully(timeout) 优雅停机等待在途任务
  • JobOptionsMisfirePolicy (skip/once/catchup)、失败熔断(FailThreshold/FailWindow/PauseDuration)、Labels(任务标签)
  • WithEventHook(func(Event)) 任务开始/结束回调,可输出日志/指标
  • 可直接使用 NewEventLoggerHook 将事件写入日志,或 NewEventChannelHook 推送到自定义 channel
  • 历史存储:默认 JSONL <task>/<date>.jsonl,便于 tail/grep;如需旧格式可实现自定义 Storage
  • 历史清理:可调用 CleanupHistory(time.Now().Add(-30*24*time.Hour)),或使用脚本 scripts/cleanup_history.sh <dir> <keep_days> 定期清理
  • JobRegistry:全局注册仅兼容;推荐 reg := cron.NewJobRegistry()reg.SafeRegister(job)c.ScheduleFromRegistry(reg)

常用 curl 示例(Dashboard API)

  • 立即执行:curl -X POST http://localhost:8080/api/tasks/{id}/run
  • 暂停:curl -X POST http://localhost:8080/api/tasks/{id}/pause
  • 恢复:curl -X POST http://localhost:8080/api/tasks/{id}/resume
  • 更新表达式:curl -X PATCH -H 'Content-Type: application/json' -d '{"schedule":"*/10 * * * * *"}' http://localhost:8080/api/tasks/{id}/schedule

历史存储与保留

  • 默认 JSONL 存储(按任务/日期分片),可通过 CleanupHistory(before) 清理旧记录。
  • 如需限制磁盘,可定期调用 CleanupHistory 或实现自定义 Storage(接口兼容)。

JobRegistry 推荐用法

  • 推荐使用实例化注册表:reg := cron.NewJobRegistry(),然后 reg.register(...)c.ScheduleFromRegistry(reg);全局注册仅用于示例。

最简单的使用方式

package main import ( "context" "fmt" "time" "github.com/darkit/cron" ) func main() { // 创建调度器 c := cron.New() // 添加任务 - 安全可靠 c.Schedule("hello", "*/5 * * * * *", func(ctx context.Context) { fmt.Println("每5秒执行一次") }) // 启动调度器 c.Start() defer c.StopGracefully(5 * time.Second) // 等待一段时间观察执行 time.Sleep(30 * time.Second) }

内置安全保障的任务调度

func main() { c := cron.New() // 所有任务都自动具备panic保护,无需特殊API c.Schedule("safe-task", "*/3 * * * * *", func(ctx context.Context) { fmt.Println("安全任务执行") }) // 即使任务内部panic也不会影响调度器 c.Schedule("risky-task", "*/10 * * * * *", func(ctx context.Context) { if someCondition { panic("任务遇到问题") // 这不会让程序崩溃 } fmt.Println("正常执行") }) c.Start() defer c.StopGracefully(5 * time.Second) time.Sleep(time.Minute) }

️ 安全保障机制

永不崩溃的设计

func main() { c := cron.New() // 即使任务panic,程序也不会崩溃(内置保护) c.Schedule("panic-task", "*/5 * * * * *", func(ctx context.Context) { panic("这个panic不会让程序崩溃!") }) c.Start() // 程序会持续运行,不受任务异常影响 }

自定义异常处理

// 自定义panic处理器 type MyPanicHandler struct{} func (h *MyPanicHandler) HandlePanic(taskID string, panicValue interface{}, stack []byte) { log.Printf("️ 任务 %s 异常: %v", taskID, panicValue) // 发送告警、记录日志等 } // 使用自定义处理器 c := cron.New(cron.WithPanicHandler(&MyPanicHandler{}))

生命周期管理 WithContext

优雅关闭与信号集成

func main() { // 创建响应系统信号的上下文 ctx, cancel := signal.NotifyContext(context.Background(), os.Interrupt, syscall.SIGTERM) defer cancel() // 调度器绑定到应用生命周期 scheduler := cron.New(cron.WithContext(ctx)) scheduler.Schedule("background-task", "*/10 * * * * *", func(taskCtx context.Context) { // 任务支持优雅取消 select { case <-time.After(5 * time.Second): fmt.Println("任务正常完成") case <-taskCtx.Done(): fmt.Println("任务收到取消信号,优雅退出") return } }) scheduler.Start() // 收到 Ctrl+C 信号时,调度器自动停止 <-ctx.Done() fmt.Println("应用优雅关闭") }

微服务架构中的应用

// Web服务器与调度器生命周期绑定 func startServer() { ctx, cancel := signal.NotifyContext(context.Background(), os.Interrupt) defer cancel() // 调度器与服务器共享生命周期 taskScheduler := cron.New(cron.WithContext(ctx)) taskScheduler.Schedule("health-check", "@every 30s", healthCheckTask) taskScheduler.Start() server := &http.Server{Addr: ":8080"} go server.ListenAndServe() // 等待停止信号 <-ctx.Done() // 调度器自动停止,无需手动管理 server.Shutdown(context.Background()) }

测试场景中的精确控制

func TestTaskExecution(t *testing.T) { // 5秒后自动停止调度器 ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second) defer cancel() scheduler := cron.New(cron.WithContext(ctx)) var executionCount int64 scheduler.Schedule("test-task", "*/1 * * * * *", func(taskCtx context.Context) { atomic.AddInt64(&executionCount, 1) }) scheduler.Start() <-ctx.Done() // 自动停止 // 验证执行次数 assert.Equal(t, int64(5), executionCount) }

级联取消场景

// 父服务控制子调度器 func parentService(parentCtx context.Context) { // 子上下文继承父级取消信号 childCtx, cancel := context.WithCancel(parentCtx) defer cancel() // 调度器响应父级服务的生命周期 scheduler := cron.New(cron.WithContext(childCtx)) // 父服务停止时,调度器自动停止 }

自定义日志系统

使用自定义 Logger

// 自定义日志实现 type MyLogger struct{} func (l *MyLogger) Debugf(format string, args ...any) { log.Printf("[DEBUG] "+format, args...) } func (l *MyLogger) Infof(format string, args ...any) { log.Printf("[INFO] "+format, args...) } func (l *MyLogger) Warnf(format string, args ...any) { log.Printf("[WARN] "+format, args...) } func (l *MyLogger) Errorf(format string, args ...any) { log.Printf("[ERROR] "+format, args...) } // 创建使用自定义logger的调度器 c := cron.New(cron.WithLogger(&MyLogger{})) // 事件钩子示例:输出耗时 hook := func(ev cron.Event) { if ev.End.IsZero() { // 开始事件 return } slog.Info("task done", "id", ev.TaskID, "success", ev.Success, "dur", ev.Duration, "retries", ev.Retries, "err", ev.Error) } c := cron.New(cron.WithEventHook(hook))

默认日志实现

// 默认使用log/slog,输出结构化日志: // time=2025-08-10T18:30:54.360+08:00 level=INFO msg="Starting cron scheduler" // 禁用日志输出 c := cron.New(cron.WithLogger(&cron.NoOpLogger{}))

Job 接口支持

实现 Job 接口

type BackupJob struct { name string path string } func (j *BackupJob) Name() string { return j.name } func (j *BackupJob) Run(ctx context.Context) error { fmt.Printf("执行备份任务: %s,路径: %s\n", j.name, j.path) // 支持上下文取消 select { case <-time.After(2 * time.Second): return nil case <-ctx.Done(): return ctx.Err() } } func main() { c := cron.New() job := &BackupJob{name: "daily-backup", path: "/data"} // 方式1:传统API - 手动指定任务ID c.ScheduleJob("backup", "0 2 * * *", job) // 方式2:新API - 自动使用Job的Name()方法 c.ScheduleJobByName("0 2 * * *", job) c.Start() defer c.Stop() }

工厂模式 - 更简洁优雅的 API

// 工厂函数 func NewBackupJob(path string) *BackupJob { return &BackupJob{ name: "backup-job", path: path, } } func NewCleanupJob(target string) *CleanupJob { return &CleanupJob{ name: "cleanup-job", target: target, } } func main() { c := cron.New() // 极简优雅的调度方式 c.ScheduleJobByName("0 2 * * *", NewBackupJob("/data")) // 每天2点备份 c.ScheduleJobByName("0 3 * * *", NewCleanupJob("temp")) // 每天3点清理 c.ScheduleJobByName("*/30 * * * * *", NewMonitorJob()) // 每30秒监控 c.Start() defer c.Stop() }

Job 配置选项

// 使用JobOptions配置任务 c.ScheduleJob("heavy-task", "*/5 * * * * *", job, cron.JobOptions{ Async: true, // 异步执行 Timeout: 30 * time.Second, // 超时时间 MaxConcurrent: 3, // 最大并发数 MaxRetries: 3, // 最大重试次数(-1表示无限重试,0表示不重试) RetryInterval: 1 * time.Second, // 重试间隔(0表示立即重试) })

智能重试机制

固定次数重试

当任务执行失败(panic 或超时)时,自动重试指定次数:

c := cron.New() // 任务失败后最多重试3次,每次间隔1秒 c.Schedule("task-with-retry", "@every 10s", func(ctx context.Context) { if rand.Intn(2) == 0 { panic("模拟任务失败") } fmt.Println("任务执行成功") }, cron.JobOptions{ MaxRetries: 3, // 最多重试3次 RetryInterval: 1 * time.Second, // 重试间隔1秒 }) c.Start() defer c.Stop()

无限重试

对于关键任务,可以设置无限重试直到成功:

// MaxRetries = -1 表示无限重试,直到任务成功 c.Schedule("critical-task", "@every 15s", func(ctx context.Context) { err := performCriticalOperation() if err != nil { panic(err) // 失败时会自动重试 } }, cron.JobOptions{ MaxRetries: -1, // 无限重试 RetryInterval: 500 * time.Millisecond, // 重试间隔500ms })

立即重试

对于需要快速响应的场景,可以设置立即重试(不等待):

// RetryInterval = 0 表示立即重试,不等待 c.Schedule("fast-retry", "@every 20s", func(ctx context.Context) { if !checkCondition() { panic("条件未满足") } processTask() }, cron.JobOptions{ MaxRetries: 5, // 最多重试5次 RetryInterval: 0, // 立即重试(不等待) })

重试与超时结合

重试机制可以与超时控制配合使用:

c.Schedule("timeout-retry", "@every 25s", func(ctx context.Context) { // 执行耗时操作 err := performHeavyTask(ctx) if err != nil { panic(err) } }, cron.JobOptions{ Timeout: 2 * time.Second, // 超时时间2秒 MaxRetries: 5, // 最多重试5次 RetryInterval: 500 * time.Millisecond, // 重试间隔500ms })

重试统计

重试信息会自动记录到统计数据中:

// 获取任务统计信息 stats, ok := c.GetStats("task-with-retry") if ok { fmt.Printf("运行次数: %d\n", stats.RunCount) fmt.Printf("成功次数: %d\n", stats.SuccessCount) fmt.Printf("失败次数: %d\n", stats.FailCount) fmt.Printf("重试总次数: %d\n", stats.RetryCount) // 重试次数统计 }

重试行为说明

配置行为适用场景
MaxRetries = 0不重试,失败即停止非关键任务,快速失败
MaxRetries > 0固定次数重试临时故障恢复,限制资源消耗
MaxRetries = -1无限重试直到成功关键任务,必须成功执行
RetryInterval = 0立即重试,不等待条件检查,快速响应
RetryInterval > 0等待指定时间后重试外部服务恢复,避免频繁重试

上下文取消支持

重试过程中会检查上下文取消信号,确保优雅退出:

ctx, cancel := context.WithCancel(context.Background()) c := cron.New(cron.WithContext(ctx)) c.Schedule("cancellable-retry", "@every 1s", func(taskCtx context.Context) { panic("always fail") }, cron.JobOptions{ MaxRetries: 10, RetryInterval: 200 * time.Millisecond, }) c.Start() // 取消上下文时,重试会立即停止 time.Sleep(600 * time.Millisecond) cancel()

完整示例

查看 examples/retry/ 目录获取更多重试机制的使用示例:

go run examples/retry/main.go

历史记录功能

历史记录功能可以持久化保存任务的执行历史,方便后续查询、分析和审计。

启用历史记录

import ( "github.com/darkit/cron" "github.com/darkit/cron/history" ) // 创建存储(基于文件系统的 KV 存储) storage, err := history.NewFileStorage("/path/to/history") if err != nil { log.Fatal(err) } defer storage.Close() // 创建历史记录器 recorder := history.NewHistoryRecorder(storage) defer recorder.Close() // 创建启用历史记录的调度器 c := cron.New(cron.WithHistoryRecorder(recorder))

基本查询

// 查询所有历史记录 records, err := c.QueryHistory(history.RecordFilter{}) // 查询特定任务的历史 records, err := c.QueryHistory(history.RecordFilter{ TaskID: "my-task", }) // 仅查询成功的记录 records, err := c.QueryHistory(history.RecordFilter{ SuccessOnly: true, }) // 仅查询失败的记录 records, err := c.QueryHistory(history.RecordFilter{ FailedOnly: true, })

时间范围查询

// 查询最近 24 小时的记录 now := time.Now() yesterday := now.Add(-24 * time.Hour) records, err := c.QueryHistory(history.RecordFilter{ StartTime: &yesterday, EndTime: &now, })

分页查询

// 查询前 10 条记录 records, err := c.QueryHistory(history.RecordFilter{ Limit: 10, }) // 跳过前 20 条,获取接下来的 10 条 records, err := c.QueryHistory(history.RecordFilter{ Offset: 20, Limit: 10, })

统计记录数量

// 统计所有记录 count, err := c.CountHistory(history.RecordFilter{}) // 统计特定任务的记录 count, err := c.CountHistory(history.RecordFilter{ TaskID: "my-task", }) // 统计失败的记录 count, err := c.CountHistory(history.RecordFilter{ FailedOnly: true, })

清理旧记录

// 清理 7 天前的历史记录 sevenDaysAgo := time.Now().Add(-7 * 24 * time.Hour) deleted, err := c.CleanupHistory(sevenDaysAgo) fmt.Printf("已删除 %d 条历史记录\n", deleted) // 定期清理(推荐在定时任务中执行) c.Schedule("cleanup-history", "@daily", func(ctx context.Context) { thirtyDaysAgo := time.Now().Add(-30 * 24 * time.Hour) deleted, err := c.CleanupHistory(thirtyDaysAgo) if err != nil { log.Printf("清理历史记录失败: %v", err) return } log.Printf("已清理 %d 条历史记录", deleted) })

记录内容

每条历史记录包含以下信息:

type ExecutionRecord struct { ID string // 记录唯一标识 TaskID string // 任务ID StartTime time.Time // 开始时间 EndTime time.Time // 结束时间 Duration time.Duration // 执行耗时 Success bool // 是否成功 RetryCount int // 重试次数 Error string // 错误信息(如果失败) }

存储结构

历史记录采用日期分片存储策略:

<baseDir>/ ├── task-id-1/ │ ├── 2025-10-30.json │ ├── 2025-10-31.json │ └── ... ├── task-id-2/ │ ├── 2025-10-30.json │ └── ... └── ...

优势:

  • 按任务和日期分片,提升查询效率
  • 文件格式简单,易于备份和恢复
  • 支持增量清理,避免大文件问题

性能考虑

  1. 异步写入:历史记录器使用异步队列,不阻塞任务执行
  2. 批量查询:日期分片支持高效的时间范围查询
  3. 自动清理:定期清理旧记录,控制存储空间

完整示例

查看 examples/history/ 目录获取完整的历史记录使用示例:

go run examples/history/main.go

Web Dashboard 管理界面

Dashboard 是一个独立的 Web 管理界面子包,提供可视化的任务管理和监控功能。

功能特性

  • 任务列表 - 实时查看所有任务的状态和统计信息
  • 执行历史 - 查询和分析任务执行历史记录
  • 统计分析 - 系统级统计数据和成功率分析
  • 任务管理 - 支持移除任务等管理操作
  • 实时刷新 - 每 5 秒自动刷新数据

快速开始

import ( "github.com/darkit/cron" "github.com/darkit/cron/dashboard" "github.com/darkit/cron/history" ) func main() { // 创建历史记录存储 storage, _ := history.NewFileStorage("./history") recorder := history.NewHistoryRecorder(storage) defer recorder.Close() // 创建调度器 c := cron.New(cron.WithHistoryRecorder(recorder)) // 添加任务 c.Schedule("task-1", "@every 10s", func(ctx context.Context) { fmt.Println("任务执行") }) c.Start() defer c.Stop() // 启动 Dashboard dashboardServer := dashboard.NewServer(c, ":8080") dashboardServer.Start() defer dashboardServer.Stop() // 访问 http://localhost:8080 select {} }

API 端点

Dashboard 提供以下 RESTful API:

  • GET /api/tasks - 获取所有任务列表
  • GET /api/tasks/{id} - 获取任务详情
  • DELETE /api/tasks/{id} - 移除任务
  • GET /api/stats - 获取统计信息
  • GET /api/history - 查询历史记录(支持过滤和分页)

完整示例

查看 examples/dashboard/ 目录获取完整示例:

go run examples/dashboard/main.go

详细文档请参考:Dashboard README

自动注册功能

RegisteredJob 接口

type CleanupJob struct{} func (j *CleanupJob) Name() string { return "cleanup" } func (j *CleanupJob) Schedule() string { return "0 3 * * *" } func (j *CleanupJob) Run(ctx context.Context) error { fmt.Println("执行清理任务") return nil } func init() { // 在包初始化时自动注册 cron.SafeRegisterJob(&CleanupJob{}) } func main() { c := cron.New() // 批量调度所有已注册的任务 c.ScheduleRegistered() c.Start() defer c.Stop() }

监控和统计

func main() { c := cron.New() c.Schedule("monitored", "*/10 * * * * *", func(ctx context.Context) { fmt.Println("监控任务执行") }) c.Start() defer c.Stop() // 获取任务统计 if stats, ok := c.GetStats("monitored"); ok { fmt.Printf("运行次数: %d, 成功次数: %d\n", stats.RunCount, stats.SuccessCount) } // 获取所有任务统计 allStats := c.GetAllStats() for id, stats := range allStats { fmt.Printf("任务 %s: %+v\n", id, stats) } }

运行时管理

func main() { c := cron.New() // 添加任务 c.Schedule("task1", "*/5 * * * * *", handler1) c.Schedule("task2", "*/10 * * * * *", handler2) c.Start() // 列出所有任务 tasks := c.List() fmt.Printf("当前任务: %v\n", tasks) // 获取下次执行时间 if nextRun, err := c.NextRun("task1"); err == nil { fmt.Printf("task1下次执行: %v\n", nextRun) } // 移除任务 c.Remove("task2") c.Stop() }

Cron 表达式支持

标准表达式

// 6段表达式(秒级) "*/30 * * * * *" // 每30秒 "0 */5 * * * *" // 每5分钟 "0 0 8 * * *" // 每天8点 // 5段表达式(分钟级) "*/5 * * * *" // 每5分钟 "0 8 * * *" // 每天8点 "0 0 1 * *" // 每月1日

描述符语法

// 预定义描述符 "@hourly" // 每小时 "@daily" // 每天 "@weekly" // 每周 "@monthly" // 每月 "@yearly" // 每年 // @every语法 "@every 5s" // 每5秒 "@every 1m" // 每1分钟 "@every 2h" // 每2小时

时区前缀

"TZ=America/New_York 0 0 * * *" // 纽约时间每天 0 点执行 "CRON_TZ=Asia/Shanghai 0 30 9 * * *" // 上海时间每天 9:30 执行

小贴士:TZ=CRON_TZ= 都遵循 IANA 时区数据库,库内部会自动切换夏令时。

完整示例

运行示例

# 最简示例 go run examples/minimal/main.go # 自动注册演示 go run examples/true-auto-register/main.go # 异常恢复演示 go run examples/panic-recovery/main.go # 自定义Logger演示 go run examples/custom-logger/main.go # 重试机制演示 go run examples/retry/main.go # 历史记录演示 go run examples/history/main.go # Dashboard 管理界面演示 go run examples/dashboard/main.go

示例说明

示例目录功能演示适用场景
examples/minimal/最基本的使用方式快速上手,了解基本 API
examples/true-auto-register/自动注册机制学习 Job 自动注册和批量调度
examples/panic-recovery/异常恢复演示了解安全保障机制
examples/custom-logger/自定义 Logger学习日志系统定制
examples/retry/重试机制演示学习任务失败重试策略
examples/history/历史记录演示学习任务执行历史查询和管理
examples/dashboard/Web Dashboard 演示学习 Web 管理界面的使用
examples/jobs/可复用的任务集合参考任务实现模式

完整 API 文档

核心方法

核心 API(简洁优雅)

// 主力推荐:最简洁优雅的API func (c *Cron) ScheduleJobByName(schedule string, job Job, opts ...JobOptions) error // 显式指定任务名时使用 func (c *Cron) ScheduleJob(id, schedule string, job Job, opts ...JobOptions) error // 函数式调度(简单场景) func (c *Cron) Schedule(id, schedule string, handler func(ctx context.Context)) error // 批量调度 func (c *Cron) ScheduleRegistered(opts ...JobOptions) error // 生命周期管理 func (c *Cron) Start() error func (c *Cron) Stop() func (c *Cron) StopGracefully(timeout time.Duration) func (c *Cron) IsRunning() bool // 任务管理 func (c *Cron) Remove(id string) error func (c *Cron) List() []string func (c *Cron) NextRun(id string) (time.Time, error) func (c *Cron) RunNow(id string) error func (c *Cron) Update(id, schedule string, opts ...JobOptions) error func (c *Cron) Pause(id string) error func (c *Cron) Resume(id string) error func (c *Cron) PauseAll() func (c *Cron) ResumeAll() // 任务详情 func (c *Cron) GetTask(id string) (*TaskInfo, bool) func (c *Cron) GetAllTasks() []*TaskInfo // 监控统计 func (c *Cron) GetStats(id string) (*Stats, bool) func (c *Cron) GetAllStats() map[string]*Stats

配置选项

// 创建选项 func New(opts ...Option) *Cron func WithLogger(logger Logger) Option func WithPanicHandler(handler PanicHandler) Option func WithContext(ctx context.Context) Option // 任务配置 type JobOptions struct { Timeout time.Duration // 任务超时时间 MaxRetries int // 最大重试次数,-1 表示无限重试,0 表示不重试 RetryInterval time.Duration // 重试间隔时间,0 表示立即重试 Async bool // 是否异步执行 MaxConcurrent int // 最大并发数 MisfirePolicy MisfirePolicy // Misfire 策略:skip(跳过)、once(补跑一次)、catchup(追赶) MaxCatchUp int // MisfireCatchUp 策略的最大补跑次数,0 表示使用默认值 5 FailThreshold int // 连续失败阈值,达到后触发暂停 FailWindow time.Duration // 统计失败的时间窗口,0 表示不限窗口 PauseDuration time.Duration // 自动暂停时长 Labels map[string]string // 任务标签元数据 } // 任务详情 type TaskInfo struct { ID string // 任务ID Schedule string // cron表达式 Options JobOptions // 任务配置 Labels map[string]string // 元数据标签 NextRun time.Time // 下次执行时间 IsPaused bool // 是否暂停 IsRunning bool // 是否正在运行 CreatedAt time.Time // 创建时间 }

接口定义

// 基本Job接口 type Job interface { Run(ctx context.Context) error Name() string // 新增:返回任务名称,用于ScheduleJobByName() } // 可注册Job接口 type RegisteredJob interface { Job Name() string // 返回任务名称 Schedule() string // cron调度表达式 } // 最佳实践:统一使用Name()方法,简化API设计 // 示例实现: func (j *MyJob) Name() string { return "my-job" } // 日志接口 type Logger interface { Debugf(format string, args ...any) Infof(format string, args ...any) Warnf(format string, args ...any) Errorf(format string, args ...any) } // 异常处理接口 type PanicHandler interface { HandlePanic(taskID string, panicValue interface{}, stack []byte) }

最佳实践

1. ⭐ API 方法选择指南

// 首选:ScheduleJobByName - 最简洁优雅 c.ScheduleJobByName("0 2 * * *", NewBackupJob("/data")) c.ScheduleJobByName("*/30 * * * * *", NewMonitorJob()) // 备选:ScheduleJob - 需要显式控制任务名时 c.ScheduleJob("custom-name", "0 3 * * *", job) // 简单场景:Schedule - 函数式调度 c.Schedule("simple-task", "*/5 * * * * *", func(ctx context.Context) { fmt.Println("简单任务执行") }) // 现在只有3个核心方法,简洁明了!

2. 利用上下文进行优雅取消

c.Schedule("graceful-task", "*/30 * * * * *", func(ctx context.Context) { for i := 0; i < 100; i++ { select { case <-ctx.Done(): return // 优雅退出 default: processItem(i) } } })

3. 异步处理耗时任务

// 对于耗时任务使用异步执行 c.ScheduleJob("heavy-task", "*/5 * * * * *", heavyJob, cron.JobOptions{ Async: true, Timeout: 2 * time.Minute, })

4. 监控任务执行状态

// 定期检查任务状态 go func() { ticker := time.NewTicker(time.Minute) for range ticker.C { stats := c.GetAllStats() for id, stat := range stats { if stat.IsRunning { log.Printf("任务 %s 正在执行中", id) } } } }()

5. 合理配置重试策略

// 根据任务类型选择合适的重试策略 // 1. 网络请求:固定次数重试 + 合理间隔 c.Schedule("api-call", "@every 1m", apiTask, cron.JobOptions{ MaxRetries: 3, RetryInterval: 5 * time.Second, // 给外部服务恢复时间 Timeout: 10 * time.Second, }) // 2. 关键任务:无限重试 + 较短间隔 c.Schedule("critical-sync", "@every 5m", syncTask, cron.JobOptions{ MaxRetries: -1, // 无限重试直到成功 RetryInterval: 1 * time.Second, }) // 3. 条件检查:立即重试 + 有限次数 c.Schedule("condition-check", "@every 30s", checkTask, cron.JobOptions{ MaxRetries: 5, RetryInterval: 0, // 立即重试 }) // 4. 非关键任务:不重试,快速失败 c.Schedule("optional-task", "@every 1h", optionalTask, cron.JobOptions{ MaxRetries: 0, // 不重试 })

常见问题

Q: 如何确保任务不会因为 panic 而影响其他任务?

A: 所有任务都内置 panic 保护:

// 所有核心API都自动具备panic保护,任务panic不会影响调度器 c.Schedule("task", "*/5 * * * * *", func(ctx context.Context) { // 即使这里panic,其他任务仍然正常执行 panic("任务异常") })

Q: 如何自定义日志输出格式?

A: 实现 Logger 接口:

type CustomLogger struct{} func (l *CustomLogger) Infof(format string, args ...any) { // 自定义输出格式 log.Printf("[CRON] "+format, args...) } // 其他方法... c := cron.New(cron.WithLogger(&CustomLogger{}))

Q: 如何处理长时间运行的任务?

A: 使用异步执行和超时控制:

c.ScheduleJob("long-task", "0 2 * * *", job, cron.JobOptions{ Async: true, // 异步执行,不阻塞调度器 Timeout: 30 * time.Minute, // 设置超时时间 })

Q: 如何在程序启动时自动加载预定义任务?

A: 使用 RegisteredJob 和自动注册:

// 在init函数中注册 func init() { cron.SafeRegisterJob(&MyJob{}) } // 在main函数中批量调度 func main() { c := cron.New() c.ScheduleRegistered() c.Start() }

Q: 任务失败后如何自动重试?

A: 使用 JobOptions 配置重试策略:

// 固定次数重试 c.Schedule("task", "@every 10s", handler, cron.JobOptions{ MaxRetries: 3, // 失败后重试3次 RetryInterval: 1 * time.Second, // 每次重试间隔1秒 }) // 无限重试(关键任务) c.Schedule("critical", "@every 5m", handler, cron.JobOptions{ MaxRetries: -1, // 无限重试直到成功 RetryInterval: 5 * time.Second, }) // 立即重试(快速响应) c.Schedule("fast", "@every 1m", handler, cron.JobOptions{ MaxRetries: 5, RetryInterval: 0, // 立即重试,不等待 })

Q: 如何查看任务的重试统计?

A: 使用 GetStats 获取重试信息:

stats, ok := c.GetStats("task-id") if ok { fmt.Printf("运行次数: %d\n", stats.RunCount) fmt.Printf("成功次数: %d\n", stats.SuccessCount) fmt.Printf("失败次数: %d\n", stats.FailCount) fmt.Printf("重试总次数: %d\n", stats.RetryCount) }

Q: 重试机制支持超时吗?

A: 完全支持,每次重试都会应用超时设置:

c.Schedule("task", "@every 1m", handler, cron.JobOptions{ Timeout: 5 * time.Second, // 每次执行(包括重试)都有5秒超时 MaxRetries: 3, RetryInterval: 2 * time.Second, }) // 超时的任务会被视为失败并触发重试

Q: 如何在调度器停止时中断重试过程?

A: 使用 WithContext 选项,重试过程会响应上下文取消:

ctx, cancel := context.WithCancel(context.Background()) c := cron.New(cron.WithContext(ctx)) c.Schedule("task", "@every 1m", handler, cron.JobOptions{ MaxRetries: 10, RetryInterval: 1 * time.Second, }) c.Start() // 调用 cancel() 会停止调度器并中断所有正在进行的重试 cancel()

故障排除

常见错误

错误信息原因解决方案
task xxx already exists任务 ID 重复使用唯一的任务 ID
invalid cron speccron 表达式错误检查表达式格式
handler cannot be nil处理函数为空确保传入有效函数
scheduler is already running重复启动调度器检查调度器状态

调试建议

// 启用详细日志 logger := cron.NewDefaultLogger() c := cron.New(cron.WithLogger(logger)) // 监控任务状态 stats := c.GetAllStats() for id, stat := range stats { fmt.Printf("任务 %s: 运行%d次, 成功%d次\n", id, stat.RunCount, stat.SuccessCount) }

许可证

MIT License - 查看 LICENSE 文件了解详情。

贡献

欢迎提交 Issue 和 Pull Request!请参考 CONTRIBUTING.md 了解贡献指南。


简洁、安全、易用 - 这就是 Cron 调度库的设计理念!