diff --git a/templates/project/providers/job/provider.go.tpl b/templates/project/providers/job/provider.go.tpl index 2e24ca6..0aadd3f 100644 --- a/templates/project/providers/job/provider.go.tpl +++ b/templates/project/providers/job/provider.go.tpl @@ -2,6 +2,7 @@ package job import ( "context" + "quyun/providers/postgres" "sync" "{{.ModuleName}}/providers/postgres" @@ -17,6 +18,18 @@ import ( "go.ipao.vip/atom/container" "go.ipao.vip/atom/contracts" "go.ipao.vip/atom/opt" + + "github.com/jackc/pgx/v5" + "github.com/jackc/pgx/v5/pgxpool" + "github.com/pkg/errors" + "github.com/riverqueue/river" + "github.com/riverqueue/river/riverdriver/riverpgxv5" + "github.com/riverqueue/river/rivertype" + "github.com/samber/lo" + log "github.com/sirupsen/logrus" + "go.ipao.vip/atom/container" + "go.ipao.vip/atom/contracts" + "go.ipao.vip/atom/opt" ) func Provide(opts ...opt.Option) error { @@ -48,14 +61,13 @@ func Provide(opts ...opt.Option) error { } type Job struct { + ctx context.Context Workers *river.Workers + driver *riverpgxv5.Driver - l sync.Mutex + l sync.Mutex + client *river.Client[pgx.Tx] - ctx context.Context - driver *riverpgxv5.Driver - - client *river.Client[pgx.Tx] periodicJobs map[string]rivertype.PeriodicJobHandle jobs map[string]*rivertype.JobInsertResult } @@ -92,6 +104,31 @@ func (q *Job) Client() (*river.Client[pgx.Tx], error) { return q.client, nil } +func (q *Job) Start(ctx context.Context) error { + client, err := q.Client() + if err != nil { + return errors.Wrap(err, "get client failed") + } + + if err := client.Start(ctx); err != nil { + return err + } + defer client.StopAndCancel(ctx) + + <-ctx.Done() + + return nil +} + +func (q *Job) StopAndCancel(ctx context.Context) error { + client, err := q.Client() + if err != nil { + return errors.Wrap(err, "get client failed") + } + + return client.StopAndCancel(ctx) +} + func (q *Job) AddPeriodicJobs(job contracts.CronJob) error { for _, job := range job.Args() { if err := q.AddPeriodicJob(job); err != nil { @@ -109,7 +146,7 @@ func (q *Job) AddPeriodicJob(job contracts.CronJobArg) error { q.l.Lock() defer q.l.Unlock() - q.periodicJobs[job.Arg.Kind()] = client.PeriodicJobs().Add(river.NewPeriodicJob( + q.periodicJobs[job.Arg.UniqueID()] = client.PeriodicJobs().Add(river.NewPeriodicJob( job.PeriodicInterval, func() (river.JobArgs, *river.InsertOpts) { return job.Arg, lo.ToPtr(job.Arg.InsertOpts()) @@ -146,7 +183,7 @@ func (q *Job) Cancel(kind string) error { return nil } - return errors.New("job by kind(" + kind + ") not found") + return nil } func (q *Job) Add(job contracts.JobArgs) error { @@ -158,6 +195,6 @@ func (q *Job) Add(job contracts.JobArgs) error { q.l.Lock() defer q.l.Unlock() - q.jobs[job.Kind()], err = client.Insert(q.ctx, job, lo.ToPtr(job.InsertOpts())) + q.jobs[job.UniqueID()], err = client.Insert(q.ctx, job, lo.ToPtr(job.InsertOpts())) return err }