go 有没有能持久化的分布式定时任务库?-灵析社区

我爱上班

写了个基于kratos的服务,基本需求是:可以增删管理定时任务(如配置每天、每周发个统计报告),支持分布式结构,持久化任务。加分项包括:注册回调、结果和日志记录、失败重试等。 看kratos已经支持的transport有两个:asynq和machinery,但似乎都不满足需要基本的持久化任务需求,machinery甚至还不能删除已经添加的任务。 // RegisterPeriodicTask register a periodic task which will be triggered periodically func (server *Server) RegisterPeriodicTask(spec, name string, signature *tasks.Signature) error { //check spec schedule, err := cron.ParseStandard(spec) if err != nil { return err } f := func() { //get lock err := server.lock.LockWithRetries(utils.GetLockName(name, spec), schedule.Next(time.Now()).UnixNano()-1) if err != nil { return } //send task _, err = server.SendTask(tasks.CopySignature(signature)) if err != nil { log.ERROR.Printf("periodic task failed. task name is: %s. error is %s", name, err.Error()) } } //scheduler基于github.com/robfig/cron/v3 _, err = server.scheduler.AddFunc(spec, f) return err } 考虑到服务可能会重启,那么添加过的任务如何恢复呢?如果没有这样的库,考虑基于`github.com/go-co-op/gocron` 自己封装一个,配置的任务保存在redis中,服务启动的时候全量初始化,之后订阅对应的主题,再通过分布式锁来保证执行的唯一性,这样是否可行?

阅读量:283

点赞量:21

问AI
package main import ( "fmt" "github.com/go-co-op/gocron" "github.com/go-redis/redis/v8" "golang.org/x/net/context" "sync" ) var ( redisClient *redis.Client scheduler *gocron.Scheduler ctx = context.Background() ) func init() { redisClient = redis.NewClient(&redis.Options{ Addr: "localhost:6379", }) scheduler = gocron.NewScheduler(time.UTC) } func main() { var wg sync.WaitGroup wg.Add(1) loadTasksFromRedis() scheduler.StartAsync() wg.Wait() } func loadTasksFromRedis() { // 从 Redis 获取所有任务 taskKeys, err := redisClient.Keys(ctx, "task:*").Result() if err != nil { fmt.Println("Error fetching tasks from Redis:", err) return } for _, taskKey := range taskKeys { taskData, err := redisClient.HGetAll(ctx, taskKey).Result() if err != nil { fmt.Println("Error fetching task data from Redis:", err) continue } createTaskFromData(taskData) } } func createTaskFromData(taskData map[string]string) { // 解析 taskData 并创建任务 // ... // 添加任务到调度器 // ... } func executeTask(taskData map[string]string) { // 尝试获取分布式锁 lock, err := redisClient.SetNX(ctx, "lock:"+taskData["id"], 1, time.Duration(taskData["lockDuration"])*time.Second).Result() if err != nil || !lock { fmt.Println("Failed to acquire lock for task:", taskData["id"]) return } // 执行任务 // ... // 释放分布式锁 redisClient.Del(ctx, "lock:"+taskData["id"]) }