diff --git a/templates/project/providers/job/config.go.tpl b/templates/project/providers/job/config.go.tpl index 77b0f7d..cf54cc8 100644 --- a/templates/project/providers/job/config.go.tpl +++ b/templates/project/providers/job/config.go.tpl @@ -17,7 +17,19 @@ func DefaultProvider() container.ProviderContainer { } } -type Config struct{} +type Config struct { + // Optional per-queue worker concurrency. If empty, defaults apply. + QueueWorkers QueueWorkersConfig +} + +// QueueWorkers allows configuring worker concurrency per queue. +// Key is the queue name, value is MaxWorkers. If empty, defaults are used. +// Example TOML: +// +// [Job] +// # high=20, default=10, low=5 +// # QueueWorkers = { high = 20, default = 10, low = 5 } +type QueueWorkersConfig map[string]int const ( PriorityDefault = river.PriorityDefault @@ -31,3 +43,25 @@ const ( QueueDefault = river.QueueDefault QueueLow = "low" ) + +// queueConfig returns a river.QueueConfig map built from QueueWorkers or defaults. +func (c *Config) queueConfig() map[string]river.QueueConfig { + cfg := map[string]river.QueueConfig{} + if c == nil || len(c.QueueWorkers) == 0 { + cfg[QueueHigh] = river.QueueConfig{MaxWorkers: 10} + cfg[QueueDefault] = river.QueueConfig{MaxWorkers: 10} + cfg[QueueLow] = river.QueueConfig{MaxWorkers: 10} + return cfg + } + for name, n := range c.QueueWorkers { + if n <= 0 { + n = 1 + } + cfg[name] = river.QueueConfig{MaxWorkers: n} + } + + if _, ok := cfg[QueueDefault]; !ok { + cfg[QueueDefault] = river.QueueConfig{MaxWorkers: 10} + } + return cfg +} diff --git a/templates/project/providers/job/provider.go.tpl b/templates/project/providers/job/provider.go.tpl index 6613007..a35d3cb 100644 --- a/templates/project/providers/job/provider.go.tpl +++ b/templates/project/providers/job/provider.go.tpl @@ -2,9 +2,11 @@ package job import ( "context" + "fmt" "sync" + "time" - "{{.ModuleName}}/providers/postgres" + "test/providers/postgres" "github.com/jackc/pgx/v5" "github.com/jackc/pgx/v5/pgxpool" @@ -37,10 +39,22 @@ func Provide(opts ...opt.Option) error { if err != nil { return nil, err } + // health check ping with timeout + pingCtx, cancel := context.WithTimeout(ctx, 3*time.Second) + defer cancel() + if err := dbPool.Ping(pingCtx); err != nil { + return nil, fmt.Errorf("job provider: db ping failed: %w", err) + } container.AddCloseAble(dbPool.Close) pool := riverpgxv5.New(dbPool) - queue := &Job{Workers: workers, driver: pool, ctx: ctx, periodicJobs: make(map[string]rivertype.PeriodicJobHandle), jobs: make(map[string]*rivertype.JobInsertResult)} + queue := &Job{ + Workers: workers, + driver: pool, + ctx: ctx, + conf: &config, + periodicJobs: make(map[string]rivertype.PeriodicJobHandle), + } container.AddCloseAble(queue.Close) return queue, nil @@ -49,6 +63,7 @@ func Provide(opts ...opt.Option) error { type Job struct { ctx context.Context + conf *Config Workers *river.Workers driver *riverpgxv5.Driver @@ -56,7 +71,6 @@ type Job struct { client *river.Client[pgx.Tx] periodicJobs map[string]rivertype.PeriodicJobHandle - jobs map[string]*rivertype.JobInsertResult } func (q *Job) Close() { @@ -67,6 +81,10 @@ func (q *Job) Close() { if err := q.client.StopAndCancel(q.ctx); err != nil { log.Errorf("Failed to stop and cancel client: %s", err) } + // clear references + q.l.Lock() + q.periodicJobs = map[string]rivertype.PeriodicJobHandle{} + q.l.Unlock() } func (q *Job) Client() (*river.Client[pgx.Tx], error) { @@ -77,11 +95,7 @@ func (q *Job) Client() (*river.Client[pgx.Tx], error) { var err error q.client, err = river.NewClient(q.driver, &river.Config{ Workers: q.Workers, - Queues: map[string]river.QueueConfig{ - QueueHigh: {MaxWorkers: 10}, - QueueDefault: {MaxWorkers: 10}, - QueueLow: {MaxWorkers: 10}, - }, + Queues: q.conf.queueConfig(), }) if err != nil { return nil, err @@ -158,15 +172,21 @@ func (q *Job) Cancel(id string) error { if h, ok := q.periodicJobs[id]; ok { client.PeriodicJobs().Remove(h) delete(q.periodicJobs, id) - return nil } + return nil +} - if r, ok := q.jobs[id]; ok { - _, err = client.JobCancel(q.ctx, r.Job.ID) - if err != nil { - return err - } - delete(q.jobs, id) +// CancelContext is like Cancel but allows passing a context. +func (q *Job) CancelContext(ctx context.Context, id string) error { + client, err := q.Client() + if err != nil { + return err + } + q.l.Lock() + defer q.l.Unlock() + if h, ok := q.periodicJobs[id]; ok { + client.PeriodicJobs().Remove(h) + delete(q.periodicJobs, id) return nil } @@ -182,6 +202,6 @@ func (q *Job) Add(job contracts.JobArgs) error { q.l.Lock() defer q.l.Unlock() - q.jobs[job.UniqueID()], err = client.Insert(q.ctx, job, lo.ToPtr(job.InsertOpts())) + _, err = client.Insert(q.ctx, job, lo.ToPtr(job.InsertOpts())) return err }