From d013d6c8657698feb13771288cbf0d4ad510b668 Mon Sep 17 00:00:00 2001 From: Rogee Date: Fri, 27 Dec 2024 20:54:41 +0800 Subject: [PATCH] feat: add job --- .../project/pkg/service/queue/error.go.tpl | 24 ++++++ .../project/pkg/service/queue/river.go.tpl | 70 +++++++++++++++ templates/project/providers/job/config.go.tpl | 25 ++++++ .../project/providers/job/provider.go.tpl | 85 +++++++++++++++++++ 4 files changed, 204 insertions(+) create mode 100644 templates/project/pkg/service/queue/error.go.tpl create mode 100644 templates/project/pkg/service/queue/river.go.tpl create mode 100644 templates/project/providers/job/config.go.tpl create mode 100644 templates/project/providers/job/provider.go.tpl diff --git a/templates/project/pkg/service/queue/error.go.tpl b/templates/project/pkg/service/queue/error.go.tpl new file mode 100644 index 0000000..3300b00 --- /dev/null +++ b/templates/project/pkg/service/queue/error.go.tpl @@ -0,0 +1,24 @@ +package queue + +import ( + "context" + + "github.com/riverqueue/river" + "github.com/riverqueue/river/rivertype" + log "github.com/sirupsen/logrus" +) + +type CustomErrorHandler struct{} + +func (*CustomErrorHandler) HandleError(ctx context.Context, job *rivertype.JobRow, err error) *river.ErrorHandlerResult { + log.Infof("Job errored with: %s\n", err) + return nil +} + +func (*CustomErrorHandler) HandlePanic(ctx context.Context, job *rivertype.JobRow, panicVal any, trace string) *river.ErrorHandlerResult { + log.Infof("Job panicked with: %v\n", panicVal) + log.Infof("Stack trace: %s\n", trace) + return &river.ErrorHandlerResult{ + SetCancelled: true, + } +} diff --git a/templates/project/pkg/service/queue/river.go.tpl b/templates/project/pkg/service/queue/river.go.tpl new file mode 100644 index 0000000..a0e0eda --- /dev/null +++ b/templates/project/pkg/service/queue/river.go.tpl @@ -0,0 +1,70 @@ +package queue + +import ( + "context" + + "demo01/app/jobs" + "demo01/pkg/service" + "demo01/providers/app" + "demo01/providers/job" + "demo01/providers/postgres" + + "git.ipao.vip/rogeecn/atom" + "git.ipao.vip/rogeecn/atom/container" + "git.ipao.vip/rogeecn/atom/contracts" + log "github.com/sirupsen/logrus" + "github.com/spf13/cobra" + "go.uber.org/dig" +) + +func defaultProviders() container.Providers { + return service.Default(container.Providers{ + postgres.DefaultProvider(), + job.DefaultProvider(), + }...) +} + +func Command() atom.Option { + return atom.Command( + atom.Name("queue"), + atom.Short("start queue processor"), + atom.RunE(Serve), + atom.Providers( + defaultProviders(). + With( + jobs.Provide, + ), + ), + ) +} + +type Service struct { + dig.In + + App *app.Config + Job *job.Job + Initials []contracts.Initial `group:"initials"` +} + +func Serve(cmd *cobra.Command, args []string) error { + return container.Container.Invoke(func(ctx context.Context, svc Service) error { + log.SetFormatter(&log.JSONFormatter{}) + + if svc.App.IsDevMode() { + log.SetLevel(log.DebugLevel) + } + + client, err := svc.Job.Client() + if err != nil { + return err + } + + if err := client.Start(ctx); err != nil { + return err + } + defer client.StopAndCancel(ctx) + + <-ctx.Done() + return nil + }) +} diff --git a/templates/project/providers/job/config.go.tpl b/templates/project/providers/job/config.go.tpl new file mode 100644 index 0000000..76a5695 --- /dev/null +++ b/templates/project/providers/job/config.go.tpl @@ -0,0 +1,25 @@ +package job + +import ( + "git.ipao.vip/rogeecn/atom/container" + "git.ipao.vip/rogeecn/atom/utils/opt" +) + +const DefaultPrefix = "Job" + +func DefaultProvider() container.ProviderContainer { + return container.ProviderContainer{ + Provider: Provide, + Options: []opt.Option{ + opt.Prefix(DefaultPrefix), + }, + } +} + +type Config struct{} + +const ( + PriorityHigh = "high" + PriorityDefault = "default" + PriorityLow = "low" +) diff --git a/templates/project/providers/job/provider.go.tpl b/templates/project/providers/job/provider.go.tpl new file mode 100644 index 0000000..c9a83f6 --- /dev/null +++ b/templates/project/providers/job/provider.go.tpl @@ -0,0 +1,85 @@ +package job + +import ( + "context" + "sync" + + "demo01/providers/postgres" + + "git.ipao.vip/rogeecn/atom/container" + "git.ipao.vip/rogeecn/atom/utils/opt" + "github.com/jackc/pgx/v5" + "github.com/jackc/pgx/v5/pgxpool" + "github.com/riverqueue/river" + "github.com/riverqueue/river/riverdriver/riverpgxv5" + log "github.com/sirupsen/logrus" +) + +func Provide(opts ...opt.Option) error { + o := opt.New(opts...) + var config Config + if err := o.UnmarshalConfig(&config); err != nil { + return err + } + return container.Container.Provide(func(ctx context.Context, dbConf *postgres.Config) (*Job, error) { + workers := river.NewWorkers() + + dbPoolConfig, err := pgxpool.ParseConfig(dbConf.DSN()) + if err != nil { + return nil, err + } + + dbPool, err := pgxpool.NewWithConfig(ctx, dbPoolConfig) + if err != nil { + return nil, err + } + container.AddCloseAble(dbPool.Close) + pool := riverpgxv5.New(dbPool) + + queue := &Job{Workers: workers, Driver: pool, ctx: ctx} + container.AddCloseAble(queue.Close) + + return queue, nil + }, o.DiOptions()...) +} + +type Job struct { + ctx context.Context + Workers *river.Workers + Driver *riverpgxv5.Driver + + l sync.Mutex + client *river.Client[pgx.Tx] +} + +func (q *Job) Close() { + if q.client == nil { + return + } + + if err := q.client.StopAndCancel(q.ctx); err != nil { + log.Errorf("Failed to stop and cancel client: %s", err) + } +} + +func (q *Job) Client() (*river.Client[pgx.Tx], error) { + q.l.Lock() + defer q.l.Unlock() + + if q.client == nil { + var err error + q.client, err = river.NewClient(q.Driver, &river.Config{ + Workers: q.Workers, + Queues: map[string]river.QueueConfig{ + PriorityHigh: {MaxWorkers: 10}, + PriorityDefault: {MaxWorkers: 10}, + PriorityLow: {MaxWorkers: 10}, + }, + }) + if err != nil { + return nil, err + } + } + + return q.client, nil +}