From aeae94a7d2496a6c7a41cc43ec9376373f074d67 Mon Sep 17 00:00:00 2001 From: Rogee Date: Sat, 15 Mar 2025 21:23:41 +0800 Subject: [PATCH] feat: update cronjob --- pkg/ast/provider/provider.go | 2 +- templates/project/app/jobs/demo_cron.go.tpl | 41 ++++++------------- .../project/app/service/queue/river.go.tpl | 32 +-------------- .../project/providers/job/provider.go.tpl | 26 +++++++----- 4 files changed, 30 insertions(+), 71 deletions(-) diff --git a/pkg/ast/provider/provider.go b/pkg/ast/provider/provider.go index b7cba8a..b012247 100644 --- a/pkg/ast/provider/provider.go +++ b/pkg/ast/provider/provider.go @@ -72,7 +72,7 @@ func Parse(source string) []Provider { return []Provider{} } - if strings.HasSuffix(source, "/provider.go") { + if strings.HasSuffix(source, "/provider.gen.go") { return []Provider{} } diff --git a/templates/project/app/jobs/demo_cron.go.tpl b/templates/project/app/jobs/demo_cron.go.tpl index f85c2eb..401aeb3 100644 --- a/templates/project/app/jobs/demo_cron.go.tpl +++ b/templates/project/app/jobs/demo_cron.go.tpl @@ -3,6 +3,7 @@ package jobs import ( "time" + "github.com/riverqueue/river" . "github.com/riverqueue/river" "github.com/sirupsen/logrus" _ "go.ipao.vip/atom" @@ -16,35 +17,17 @@ type CronJob struct { log *logrus.Entry `inject:"false"` } -func (j *CronJob) Prepare() error { - j.log = logrus.WithField("module", "cron") - return nil -} - -func (CronJob) Kind() string { - return "cron_job" -} - -// InsertOpts implements contracts.CronJob. -func (CronJob) InsertOpts() InsertOpts { - return InsertOpts{ - MaxAttempts: 1, - } -} - // JobArgs implements contracts.CronJob. -func (CronJob) JobArgs() JobArgs { - return SortArgs{ - Strings: []string{"a", "c", "b", "d"}, +func (CronJob) Args() []contracts.CronJobArg { + return []contracts.CronJobArg{ + { + Arg: SortArgs{ + Strings: []string{"a", "b", "c", "d"}, + }, + + Kind: "cron_job", + PeriodicInterval: river.PeriodicInterval(time.Second * 10), + RunOnStart: false, + }, } } - -// Periodic implements contracts.CronJob. -func (cron *CronJob) Periodic() PeriodicSchedule { - return PeriodicInterval(time.Minute) -} - -// RunOnStart implements contracts.CronJob. -func (CronJob) RunOnStart() bool { - return true -} diff --git a/templates/project/app/service/queue/river.go.tpl b/templates/project/app/service/queue/river.go.tpl index 19ee9e3..d9569b1 100644 --- a/templates/project/app/service/queue/river.go.tpl +++ b/templates/project/app/service/queue/river.go.tpl @@ -12,7 +12,6 @@ import ( "{{.ModuleName}}/providers/job" "{{.ModuleName}}/providers/postgres" - "github.com/riverqueue/river" log "github.com/sirupsen/logrus" "github.com/spf13/cobra" "go.uber.org/dig" @@ -56,37 +55,10 @@ func Serve(cmd *cobra.Command, args []string) error { log.SetLevel(log.DebugLevel) } - client, err := svc.Job.Client() - if err != nil { + if err := svc.Job.Start(ctx); err != nil { return err } - - for _, cronJob := range svc.CronJobs { - log. - WithField("module", "cron"). - WithField("name", cronJob.Description()). - WithField("duration", cronJob.Periodic().Seconds()). - Info("registering cron job") - - for _, jobArgs := range cronJob.JobArgs() { - client.PeriodicJobs().Add( - river.NewPeriodicJob( - river.PeriodicInterval(cronJob.Periodic()), - func() (river.JobArgs, *river.InsertOpts) { - return jobArgs, cronJob.InsertOpts() - }, - &river.PeriodicJobOpts{ - RunOnStart: cronJob.RunOnStart(), - }, - ), - ) - } - } - - if err := client.Start(ctx); err != nil { - return err - } - defer client.StopAndCancel(ctx) + defer svc.Job.Close(ctx) <-ctx.Done() return nil diff --git a/templates/project/providers/job/provider.go.tpl b/templates/project/providers/job/provider.go.tpl index 1ae574b..6ce6590 100644 --- a/templates/project/providers/job/provider.go.tpl +++ b/templates/project/providers/job/provider.go.tpl @@ -113,19 +113,23 @@ func (q *Job) StopAndCancel(ctx context.Context) error { return client.StopAndCancel(ctx) } -func (q *Job) AddPeriodicJobs(job contracts.CronJob) (rivertype.PeriodicJobHandle, error) { +func (q *Job) AddPeriodicJobs(job contracts.CronJob) (map[string]rivertype.PeriodicJobHandle, error) { client, err := q.Client() if err != nil { - return 0, err + return nil, err } - return client.PeriodicJobs().Add(river.NewPeriodicJob( - job.Periodic(), - func() (river.JobArgs, *river.InsertOpts) { - return job.JobArgs(), lo.ToPtr(job.InsertOpts()) - }, - &river.PeriodicJobOpts{ - RunOnStart: job.RunOnStart(), - }, - )), nil + handles := make(map[string]rivertype.PeriodicJobHandle) + for _, job := range job.Args() { + handles[job.Kind] = client.PeriodicJobs().Add(river.NewPeriodicJob( + job.PeriodicInterval, + func() (river.JobArgs, *river.InsertOpts) { + return job.Arg, lo.ToPtr(job.Arg.InsertOpts()) + }, + &river.PeriodicJobOpts{ + RunOnStart: job.RunOnStart, + }, + )) + } + return handles, nil }