From 60b5539061ed73766682af0cf0ee83dae1c51932 Mon Sep 17 00:00:00 2001 From: rogeecn Date: Mon, 17 Mar 2025 11:41:17 +0800 Subject: [PATCH] feat: suppport job cancel --- .../project/providers/job/provider.go.tpl | 89 ++++++++++--------- 1 file changed, 49 insertions(+), 40 deletions(-) diff --git a/templates/project/providers/job/provider.go.tpl b/templates/project/providers/job/provider.go.tpl index a483061..866c2f6 100644 --- a/templates/project/providers/job/provider.go.tpl +++ b/templates/project/providers/job/provider.go.tpl @@ -52,8 +52,10 @@ type Job struct { Workers *river.Workers Driver *riverpgxv5.Driver - l sync.Mutex - client *river.Client[pgx.Tx] + l sync.Mutex + client *river.Client[pgx.Tx] + periodicJobs map[string]rivertype.PeriodicJobHandle + jobs map[string]*rivertype.JobInsertResult } func (q *Job) Close() { @@ -88,51 +90,22 @@ func (q *Job) Client() (*river.Client[pgx.Tx], error) { return q.client, nil } -func (q *Job) Start(ctx context.Context) error { - client, err := q.Client() - if err != nil { - return errors.Wrap(err, "get client failed") +func (q *Job) AddPeriodicJobs(job contracts.CronJob) error { + for _, job := range job.Args() { + if err := q.AddPeriodicJob(job); err != nil { + return err + } } - - if err := client.Start(ctx); err != nil { - return err - } - defer client.StopAndCancel(ctx) - - <-ctx.Done() - return nil } -func (q *Job) StopAndCancel(ctx context.Context) error { +func (q *Job) AddPeriodicJob(job contracts.CronJobArg) error { client, err := q.Client() if err != nil { - return errors.Wrap(err, "get client failed") + return err } - return client.StopAndCancel(ctx) -} - -func (q *Job) AddPeriodicJobs(job contracts.CronJob) (map[string]rivertype.PeriodicJobHandle, error) { - var err error - - handles := make(map[string]rivertype.PeriodicJobHandle) - for _, job := range job.Args() { - handles[job.Kind()], err = q.AddPeriodicJob(job) - if err != nil { - return handles, err - } - } - return handles, nil -} - -func (q *Job) AddPeriodicJob(job contracts.CronJobArg) (rivertype.PeriodicJobHandle, error) { - client, err := q.Client() - if err != nil { - return 0, err - } - - return client.PeriodicJobs().Add(river.NewPeriodicJob( + q.periodicJobs[job.Arg.Kind()] = client.PeriodicJobs().Add(river.NewPeriodicJob( job.PeriodicInterval, func() (river.JobArgs, *river.InsertOpts) { return job.Arg, lo.ToPtr(job.Arg.InsertOpts()) @@ -140,5 +113,41 @@ func (q *Job) AddPeriodicJob(job contracts.CronJobArg) (rivertype.PeriodicJobHan &river.PeriodicJobOpts{ RunOnStart: job.RunOnStart, }, - )), nil + )) + + return nil +} + +func (q *Job) Cancel(kind string) error { + client, err := q.Client() + if err != nil { + return err + } + + if h, ok := q.periodicJobs[kind]; ok { + client.PeriodicJobs().Remove(h) + delete(q.periodicJobs, kind) + return nil + } + + if r, ok := q.jobs[kind]; ok { + _, err = client.JobCancel(q.ctx, r.Job.ID) + if err != nil { + return err + } + delete(q.jobs, kind) + return nil + } + + return errors.New("job by kind(" + kind + ") not found") +} + +func (q *Job) Add(job contracts.JobArgs) error { + client, err := q.Client() + if err != nil { + return err + } + + q.jobs[job.Kind()], err = client.Insert(q.ctx, job, lo.ToPtr(job.InsertOpts())) + return err }