feat: update job

This commit is contained in:
rogeecn
2025-03-21 16:56:26 +08:00
parent dca8874f1a
commit b83d36f917

View File

@@ -2,6 +2,7 @@ package job
import ( import (
"context" "context"
"quyun/providers/postgres"
"sync" "sync"
"{{.ModuleName}}/providers/postgres" "{{.ModuleName}}/providers/postgres"
@@ -17,6 +18,18 @@ import (
"go.ipao.vip/atom/container" "go.ipao.vip/atom/container"
"go.ipao.vip/atom/contracts" "go.ipao.vip/atom/contracts"
"go.ipao.vip/atom/opt" "go.ipao.vip/atom/opt"
"github.com/jackc/pgx/v5"
"github.com/jackc/pgx/v5/pgxpool"
"github.com/pkg/errors"
"github.com/riverqueue/river"
"github.com/riverqueue/river/riverdriver/riverpgxv5"
"github.com/riverqueue/river/rivertype"
"github.com/samber/lo"
log "github.com/sirupsen/logrus"
"go.ipao.vip/atom/container"
"go.ipao.vip/atom/contracts"
"go.ipao.vip/atom/opt"
) )
func Provide(opts ...opt.Option) error { func Provide(opts ...opt.Option) error {
@@ -48,14 +61,13 @@ func Provide(opts ...opt.Option) error {
} }
type Job struct { type Job struct {
ctx context.Context
Workers *river.Workers Workers *river.Workers
driver *riverpgxv5.Driver
l sync.Mutex l sync.Mutex
client *river.Client[pgx.Tx]
ctx context.Context
driver *riverpgxv5.Driver
client *river.Client[pgx.Tx]
periodicJobs map[string]rivertype.PeriodicJobHandle periodicJobs map[string]rivertype.PeriodicJobHandle
jobs map[string]*rivertype.JobInsertResult jobs map[string]*rivertype.JobInsertResult
} }
@@ -92,6 +104,31 @@ func (q *Job) Client() (*river.Client[pgx.Tx], error) {
return q.client, nil return q.client, nil
} }
func (q *Job) Start(ctx context.Context) error {
client, err := q.Client()
if err != nil {
return errors.Wrap(err, "get client failed")
}
if err := client.Start(ctx); err != nil {
return err
}
defer client.StopAndCancel(ctx)
<-ctx.Done()
return nil
}
func (q *Job) StopAndCancel(ctx context.Context) error {
client, err := q.Client()
if err != nil {
return errors.Wrap(err, "get client failed")
}
return client.StopAndCancel(ctx)
}
func (q *Job) AddPeriodicJobs(job contracts.CronJob) error { func (q *Job) AddPeriodicJobs(job contracts.CronJob) error {
for _, job := range job.Args() { for _, job := range job.Args() {
if err := q.AddPeriodicJob(job); err != nil { if err := q.AddPeriodicJob(job); err != nil {
@@ -109,7 +146,7 @@ func (q *Job) AddPeriodicJob(job contracts.CronJobArg) error {
q.l.Lock() q.l.Lock()
defer q.l.Unlock() defer q.l.Unlock()
q.periodicJobs[job.Arg.Kind()] = client.PeriodicJobs().Add(river.NewPeriodicJob( q.periodicJobs[job.Arg.UniqueID()] = client.PeriodicJobs().Add(river.NewPeriodicJob(
job.PeriodicInterval, job.PeriodicInterval,
func() (river.JobArgs, *river.InsertOpts) { func() (river.JobArgs, *river.InsertOpts) {
return job.Arg, lo.ToPtr(job.Arg.InsertOpts()) return job.Arg, lo.ToPtr(job.Arg.InsertOpts())
@@ -146,7 +183,7 @@ func (q *Job) Cancel(kind string) error {
return nil return nil
} }
return errors.New("job by kind(" + kind + ") not found") return nil
} }
func (q *Job) Add(job contracts.JobArgs) error { func (q *Job) Add(job contracts.JobArgs) error {
@@ -158,6 +195,6 @@ func (q *Job) Add(job contracts.JobArgs) error {
q.l.Lock() q.l.Lock()
defer q.l.Unlock() defer q.l.Unlock()
q.jobs[job.Kind()], err = client.Insert(q.ctx, job, lo.ToPtr(job.InsertOpts())) q.jobs[job.UniqueID()], err = client.Insert(q.ctx, job, lo.ToPtr(job.InsertOpts()))
return err return err
} }