package testx import ( "context" "os" "testing" jobs_args "quyun/v2/app/jobs/args" "quyun/v2/database" "quyun/v2/database/models" "quyun/v2/providers/job" "quyun/v2/providers/jwt" "quyun/v2/providers/postgres" "quyun/v2/providers/storage" "github.com/riverqueue/river" "go.ipao.vip/atom" "go.ipao.vip/atom/container" "go.ipao.vip/atom/contracts" "go.ipao.vip/atom/opt" "go.uber.org/dig" "github.com/rogeecn/fabfile" . "github.com/smartystreets/goconvey/convey" ) func Default(providers ...container.ProviderContainer) container.Providers { return append(container.Providers{ postgres.DefaultProvider(), jwt.DefaultProvider(), job.DefaultProvider(), storage.DefaultProvider(), testJobWorkersProvider(), database.DefaultProvider(), }, providers...) } type orderRefundTestWorker struct { river.WorkerDefaults[jobs_args.OrderRefundJob] } func (w *orderRefundTestWorker) Work(ctx context.Context, job *river.Job[jobs_args.OrderRefundJob]) error { return nil } type mediaAssetProcessTestWorker struct { river.WorkerDefaults[jobs_args.MediaAssetProcessJob] } func (w *mediaAssetProcessTestWorker) Work(ctx context.Context, job *river.Job[jobs_args.MediaAssetProcessJob]) error { return nil } type notificationTestWorker struct { river.WorkerDefaults[jobs_args.NotificationArgs] } func (w *notificationTestWorker) Work(ctx context.Context, job *river.Job[jobs_args.NotificationArgs]) error { arg := job.Args n := &models.Notification{ TenantID: arg.TenantID, UserID: arg.UserID, Type: arg.Type, Title: arg.Title, Content: arg.Content, IsRead: false, } return models.NotificationQuery.WithContext(ctx).Create(n) } func testJobWorkersProvider() container.ProviderContainer { return container.ProviderContainer{ Provider: func(opts ...opt.Option) error { return container.Container.Provide(func(__job *job.Job) (contracts.Initial, error) { obj := &orderRefundTestWorker{} if err := river.AddWorkerSafely(__job.Workers, obj); err != nil { return nil, err } obj2 := &mediaAssetProcessTestWorker{} if err := river.AddWorkerSafely(__job.Workers, obj2); err != nil { return nil, err } obj3 := ¬ificationTestWorker{} if err := river.AddWorkerSafely(__job.Workers, obj3); err != nil { return nil, err } return obj, nil }, atom.GroupInitial) }, } } func Serve(providers container.Providers, t *testing.T, invoke any) { Convey("tests boot up", t, func() { // 关键语义:测试用例可能会在同一进程内多次调用 Serve。 // atom/config.Load 会向全局 dig 容器重复 Provide *viper.Viper,若不重置会导致 “already provided”。 // 因此每次测试启动前都重置容器,保证各测试套件相互独立。 container.Close() container.Container = dig.New() So(container.Container.Provide(func() context.Context { return context.Background() }), ShouldBeNil) file := fabfile.MustFind("config.toml") // 支持通过 ENV_LOCAL 指定测试环境配置:config..toml localEnv := os.Getenv("ENV_LOCAL") if localEnv != "" { file = fabfile.MustFind("config." + localEnv + ".toml") } So(atom.LoadProviders(file, providers), ShouldBeNil) So(os.Setenv("JOB_INLINE", "1"), ShouldBeNil) t.Cleanup(func() { _ = os.Unsetenv("JOB_INLINE") }) So(container.Container.Invoke(func(p struct { dig.In Initials []contracts.Initial `group:"initials"` Job *job.Job }, ) error { _ = p.Initials ctx, cancel := context.WithCancel(context.Background()) t.Cleanup(cancel) go func() { _ = p.Job.Start(ctx) }() return nil }), ShouldBeNil) So(container.Container.Invoke(invoke), ShouldBeNil) }) }