From de2ddef3afc1533f1abf08b255f37ba60f88565d Mon Sep 17 00:00:00 2001 From: rogeecn Date: Tue, 11 Mar 2025 09:56:22 +0800 Subject: [PATCH] fix: job --- pkg/ast/provider/provider.go.tpl | 6 ++++ templates/project/app/jobs/demo_cron.go.tpl | 32 +++++++++---------- templates/project/app/jobs/demo_job.go.tpl | 3 -- .../project/providers/job/provider.go.tpl | 20 ++++++++++++ 4 files changed, 42 insertions(+), 19 deletions(-) diff --git a/pkg/ast/provider/provider.go.tpl b/pkg/ast/provider/provider.go.tpl index 42d0fb7..7510c03 100644 --- a/pkg/ast/provider/provider.go.tpl +++ b/pkg/ast/provider/provider.go.tpl @@ -44,6 +44,12 @@ func Provide(opts ...opt.Option) error { } {{- end }} + {{ if eq .Mode "cronjob" }} + if _, err := __job.AddPeriodicJobs(obj); err != nil { + return nil, err + } + {{- end}} + return obj, nil }{{if .ProviderGroup}}, {{.ProviderGroup}}{{end}}); err != nil { return err diff --git a/templates/project/app/jobs/demo_cron.go.tpl b/templates/project/app/jobs/demo_cron.go.tpl index 48eef21..f85c2eb 100644 --- a/templates/project/app/jobs/demo_cron.go.tpl +++ b/templates/project/app/jobs/demo_cron.go.tpl @@ -3,7 +3,7 @@ package jobs import ( "time" - "github.com/riverqueue/river" + . "github.com/riverqueue/river" "github.com/sirupsen/logrus" _ "go.ipao.vip/atom" "go.ipao.vip/atom/contracts" @@ -11,40 +11,40 @@ import ( var _ contracts.CronJob = (*CronJob)(nil) -// @provider contracts.CronJob atom.GroupCronJob +// @provider(cronjob) type CronJob struct { log *logrus.Entry `inject:"false"` } -func (cron *CronJob) Prepare() error { - cron.log = logrus.WithField("module", "cron") +func (j *CronJob) Prepare() error { + j.log = logrus.WithField("module", "cron") return nil } -func (cron *CronJob) Description() string { - return "hello world cron job" +func (CronJob) Kind() string { + return "cron_job" } // InsertOpts implements contracts.CronJob. -func (cron *CronJob) InsertOpts() *river.InsertOpts { - return nil +func (CronJob) InsertOpts() InsertOpts { + return InsertOpts{ + MaxAttempts: 1, + } } // JobArgs implements contracts.CronJob. -func (cron *CronJob) JobArgs() []river.JobArgs { - return []river.JobArgs{ - SortArgs{ - Strings: []string{"a", "c", "b", "d"}, - }, +func (CronJob) JobArgs() JobArgs { + return SortArgs{ + Strings: []string{"a", "c", "b", "d"}, } } // Periodic implements contracts.CronJob. -func (cron *CronJob) Periodic() time.Duration { - return time.Second * 10 +func (cron *CronJob) Periodic() PeriodicSchedule { + return PeriodicInterval(time.Minute) } // RunOnStart implements contracts.CronJob. -func (cron *CronJob) RunOnStart() bool { +func (CronJob) RunOnStart() bool { return true } diff --git a/templates/project/app/jobs/demo_job.go.tpl b/templates/project/app/jobs/demo_job.go.tpl index 9056865..ff6db4a 100644 --- a/templates/project/app/jobs/demo_job.go.tpl +++ b/templates/project/app/jobs/demo_job.go.tpl @@ -11,8 +11,6 @@ import ( _ "go.ipao.vip/atom/contracts" ) -// provider:[except|only] [returnType] [group] - var ( _ JobArgs = SortArgs{} _ JobArgsWithInsertOpts = SortArgs{} @@ -22,7 +20,6 @@ type SortArgs struct { Strings []string `json:"strings"` } -// InsertOpts implements JobArgsWithInsertOpts. func (s SortArgs) InsertOpts() InsertOpts { return InsertOpts{ Queue: QueueDefault, diff --git a/templates/project/providers/job/provider.go.tpl b/templates/project/providers/job/provider.go.tpl index e184c82..1ae574b 100644 --- a/templates/project/providers/job/provider.go.tpl +++ b/templates/project/providers/job/provider.go.tpl @@ -11,8 +11,11 @@ import ( "github.com/pkg/errors" "github.com/riverqueue/river" "github.com/riverqueue/river/riverdriver/riverpgxv5" + "github.com/riverqueue/river/rivertype" + "github.com/samber/lo" log "github.com/sirupsen/logrus" "go.ipao.vip/atom/container" + "go.ipao.vip/atom/contracts" "go.ipao.vip/atom/opt" ) @@ -109,3 +112,20 @@ func (q *Job) StopAndCancel(ctx context.Context) error { return client.StopAndCancel(ctx) } + +func (q *Job) AddPeriodicJobs(job contracts.CronJob) (rivertype.PeriodicJobHandle, error) { + client, err := q.Client() + if err != nil { + return 0, err + } + + return client.PeriodicJobs().Add(river.NewPeriodicJob( + job.Periodic(), + func() (river.JobArgs, *river.InsertOpts) { + return job.JobArgs(), lo.ToPtr(job.InsertOpts()) + }, + &river.PeriodicJobOpts{ + RunOnStart: job.RunOnStart(), + }, + )), nil +}