This commit is contained in:
rogeecn
2025-03-11 09:56:22 +08:00
parent 94dd15b4e9
commit de2ddef3af
4 changed files with 42 additions and 19 deletions

View File

@@ -44,6 +44,12 @@ func Provide(opts ...opt.Option) error {
} }
{{- end }} {{- end }}
{{ if eq .Mode "cronjob" }}
if _, err := __job.AddPeriodicJobs(obj); err != nil {
return nil, err
}
{{- end}}
return obj, nil return obj, nil
}{{if .ProviderGroup}}, {{.ProviderGroup}}{{end}}); err != nil { }{{if .ProviderGroup}}, {{.ProviderGroup}}{{end}}); err != nil {
return err return err

View File

@@ -3,7 +3,7 @@ package jobs
import ( import (
"time" "time"
"github.com/riverqueue/river" . "github.com/riverqueue/river"
"github.com/sirupsen/logrus" "github.com/sirupsen/logrus"
_ "go.ipao.vip/atom" _ "go.ipao.vip/atom"
"go.ipao.vip/atom/contracts" "go.ipao.vip/atom/contracts"
@@ -11,40 +11,40 @@ import (
var _ contracts.CronJob = (*CronJob)(nil) var _ contracts.CronJob = (*CronJob)(nil)
// @provider contracts.CronJob atom.GroupCronJob // @provider(cronjob)
type CronJob struct { type CronJob struct {
log *logrus.Entry `inject:"false"` log *logrus.Entry `inject:"false"`
} }
func (cron *CronJob) Prepare() error { func (j *CronJob) Prepare() error {
cron.log = logrus.WithField("module", "cron") j.log = logrus.WithField("module", "cron")
return nil return nil
} }
func (cron *CronJob) Description() string { func (CronJob) Kind() string {
return "hello world cron job" return "cron_job"
} }
// InsertOpts implements contracts.CronJob. // InsertOpts implements contracts.CronJob.
func (cron *CronJob) InsertOpts() *river.InsertOpts { func (CronJob) InsertOpts() InsertOpts {
return nil return InsertOpts{
MaxAttempts: 1,
}
} }
// JobArgs implements contracts.CronJob. // JobArgs implements contracts.CronJob.
func (cron *CronJob) JobArgs() []river.JobArgs { func (CronJob) JobArgs() JobArgs {
return []river.JobArgs{ return SortArgs{
SortArgs{ Strings: []string{"a", "c", "b", "d"},
Strings: []string{"a", "c", "b", "d"},
},
} }
} }
// Periodic implements contracts.CronJob. // Periodic implements contracts.CronJob.
func (cron *CronJob) Periodic() time.Duration { func (cron *CronJob) Periodic() PeriodicSchedule {
return time.Second * 10 return PeriodicInterval(time.Minute)
} }
// RunOnStart implements contracts.CronJob. // RunOnStart implements contracts.CronJob.
func (cron *CronJob) RunOnStart() bool { func (CronJob) RunOnStart() bool {
return true return true
} }

View File

@@ -11,8 +11,6 @@ import (
_ "go.ipao.vip/atom/contracts" _ "go.ipao.vip/atom/contracts"
) )
// provider:[except|only] [returnType] [group]
var ( var (
_ JobArgs = SortArgs{} _ JobArgs = SortArgs{}
_ JobArgsWithInsertOpts = SortArgs{} _ JobArgsWithInsertOpts = SortArgs{}
@@ -22,7 +20,6 @@ type SortArgs struct {
Strings []string `json:"strings"` Strings []string `json:"strings"`
} }
// InsertOpts implements JobArgsWithInsertOpts.
func (s SortArgs) InsertOpts() InsertOpts { func (s SortArgs) InsertOpts() InsertOpts {
return InsertOpts{ return InsertOpts{
Queue: QueueDefault, Queue: QueueDefault,

View File

@@ -11,8 +11,11 @@ import (
"github.com/pkg/errors" "github.com/pkg/errors"
"github.com/riverqueue/river" "github.com/riverqueue/river"
"github.com/riverqueue/river/riverdriver/riverpgxv5" "github.com/riverqueue/river/riverdriver/riverpgxv5"
"github.com/riverqueue/river/rivertype"
"github.com/samber/lo"
log "github.com/sirupsen/logrus" log "github.com/sirupsen/logrus"
"go.ipao.vip/atom/container" "go.ipao.vip/atom/container"
"go.ipao.vip/atom/contracts"
"go.ipao.vip/atom/opt" "go.ipao.vip/atom/opt"
) )
@@ -109,3 +112,20 @@ func (q *Job) StopAndCancel(ctx context.Context) error {
return client.StopAndCancel(ctx) return client.StopAndCancel(ctx)
} }
func (q *Job) AddPeriodicJobs(job contracts.CronJob) (rivertype.PeriodicJobHandle, error) {
client, err := q.Client()
if err != nil {
return 0, err
}
return client.PeriodicJobs().Add(river.NewPeriodicJob(
job.Periodic(),
func() (river.JobArgs, *river.InsertOpts) {
return job.JobArgs(), lo.ToPtr(job.InsertOpts())
},
&river.PeriodicJobOpts{
RunOnStart: job.RunOnStart(),
},
)), nil
}