feat: update cronjob

This commit is contained in:
Rogee
2025-03-15 21:23:41 +08:00
parent b7fcf2c3e6
commit aeae94a7d2
4 changed files with 30 additions and 71 deletions

View File

@@ -3,6 +3,7 @@ package jobs
import (
"time"
"github.com/riverqueue/river"
. "github.com/riverqueue/river"
"github.com/sirupsen/logrus"
_ "go.ipao.vip/atom"
@@ -16,35 +17,17 @@ type CronJob struct {
log *logrus.Entry `inject:"false"`
}
func (j *CronJob) Prepare() error {
j.log = logrus.WithField("module", "cron")
return nil
}
func (CronJob) Kind() string {
return "cron_job"
}
// InsertOpts implements contracts.CronJob.
func (CronJob) InsertOpts() InsertOpts {
return InsertOpts{
MaxAttempts: 1,
}
}
// JobArgs implements contracts.CronJob.
func (CronJob) JobArgs() JobArgs {
return SortArgs{
Strings: []string{"a", "c", "b", "d"},
func (CronJob) Args() []contracts.CronJobArg {
return []contracts.CronJobArg{
{
Arg: SortArgs{
Strings: []string{"a", "b", "c", "d"},
},
Kind: "cron_job",
PeriodicInterval: river.PeriodicInterval(time.Second * 10),
RunOnStart: false,
},
}
}
// Periodic implements contracts.CronJob.
func (cron *CronJob) Periodic() PeriodicSchedule {
return PeriodicInterval(time.Minute)
}
// RunOnStart implements contracts.CronJob.
func (CronJob) RunOnStart() bool {
return true
}

View File

@@ -12,7 +12,6 @@ import (
"{{.ModuleName}}/providers/job"
"{{.ModuleName}}/providers/postgres"
"github.com/riverqueue/river"
log "github.com/sirupsen/logrus"
"github.com/spf13/cobra"
"go.uber.org/dig"
@@ -56,37 +55,10 @@ func Serve(cmd *cobra.Command, args []string) error {
log.SetLevel(log.DebugLevel)
}
client, err := svc.Job.Client()
if err != nil {
if err := svc.Job.Start(ctx); err != nil {
return err
}
for _, cronJob := range svc.CronJobs {
log.
WithField("module", "cron").
WithField("name", cronJob.Description()).
WithField("duration", cronJob.Periodic().Seconds()).
Info("registering cron job")
for _, jobArgs := range cronJob.JobArgs() {
client.PeriodicJobs().Add(
river.NewPeriodicJob(
river.PeriodicInterval(cronJob.Periodic()),
func() (river.JobArgs, *river.InsertOpts) {
return jobArgs, cronJob.InsertOpts()
},
&river.PeriodicJobOpts{
RunOnStart: cronJob.RunOnStart(),
},
),
)
}
}
if err := client.Start(ctx); err != nil {
return err
}
defer client.StopAndCancel(ctx)
defer svc.Job.Close(ctx)
<-ctx.Done()
return nil

View File

@@ -113,19 +113,23 @@ func (q *Job) StopAndCancel(ctx context.Context) error {
return client.StopAndCancel(ctx)
}
func (q *Job) AddPeriodicJobs(job contracts.CronJob) (rivertype.PeriodicJobHandle, error) {
func (q *Job) AddPeriodicJobs(job contracts.CronJob) (map[string]rivertype.PeriodicJobHandle, error) {
client, err := q.Client()
if err != nil {
return 0, err
return nil, err
}
return client.PeriodicJobs().Add(river.NewPeriodicJob(
job.Periodic(),
func() (river.JobArgs, *river.InsertOpts) {
return job.JobArgs(), lo.ToPtr(job.InsertOpts())
},
&river.PeriodicJobOpts{
RunOnStart: job.RunOnStart(),
},
)), nil
handles := make(map[string]rivertype.PeriodicJobHandle)
for _, job := range job.Args() {
handles[job.Kind] = client.PeriodicJobs().Add(river.NewPeriodicJob(
job.PeriodicInterval,
func() (river.JobArgs, *river.InsertOpts) {
return job.Arg, lo.ToPtr(job.Arg.InsertOpts())
},
&river.PeriodicJobOpts{
RunOnStart: job.RunOnStart,
},
))
}
return handles, nil
}