diff --git a/backend/app/commands/testx/testing.go b/backend/app/commands/testx/testing.go index 467004b..e5d9f3d 100644 --- a/backend/app/commands/testx/testing.go +++ b/backend/app/commands/testx/testing.go @@ -39,6 +39,14 @@ func (w *orderRefundTestWorker) Work(ctx context.Context, job *river.Job[jobs_ar return nil } +type mediaAssetProcessTestWorker struct { + river.WorkerDefaults[jobs_args.MediaAssetProcessJob] +} + +func (w *mediaAssetProcessTestWorker) Work(ctx context.Context, job *river.Job[jobs_args.MediaAssetProcessJob]) error { + return nil +} + func testJobWorkersProvider() container.ProviderContainer { return container.ProviderContainer{ Provider: func(opts ...opt.Option) error { @@ -47,6 +55,11 @@ func testJobWorkersProvider() container.ProviderContainer { if err := river.AddWorkerSafely(__job.Workers, obj); err != nil { return nil, err } + + obj2 := &mediaAssetProcessTestWorker{} + if err := river.AddWorkerSafely(__job.Workers, obj2); err != nil { + return nil, err + } return obj, nil }, atom.GroupInitial) }, diff --git a/backend/app/jobs/args/media_asset_process.go b/backend/app/jobs/args/media_asset_process.go new file mode 100644 index 0000000..81fc7f6 --- /dev/null +++ b/backend/app/jobs/args/media_asset_process.go @@ -0,0 +1,38 @@ +package args + +import ( + "quyun/v2/providers/job" + + . "github.com/riverqueue/river" + "go.ipao.vip/atom/contracts" +) + +var _ contracts.JobArgs = MediaAssetProcessJob{} + +// MediaAssetProcessJob 表示“媒体资源处理”的一次性异步任务。 +// +// 设计说明: +// - 该任务用于将 media_assets 从 processing 推进到 ready/failed(由 worker 执行核心处理逻辑)。 +// - 幂等由 River Unique + 业务侧 DB 状态机共同保证(重复入队会被跳过或无副作用)。 +type MediaAssetProcessJob struct { + // TenantID 租户ID(用于多租户隔离与唯一性维度)。 + TenantID int64 `json:"tenant_id" river:"unique"` + // AssetID 媒体资源ID(用于唯一性维度)。 + AssetID int64 `json:"asset_id" river:"unique"` +} + +func (MediaAssetProcessJob) Kind() string { return "media_asset_process" } + +func (a MediaAssetProcessJob) UniqueID() string { return a.Kind() } + +func (MediaAssetProcessJob) InsertOpts() InsertOpts { + return InsertOpts{ + Queue: job.QueueDefault, + Priority: job.PriorityDefault, + // 失败可重试;由 worker 判断不可重试的场景并 JobCancel。 + MaxAttempts: 10, + UniqueOpts: UniqueOpts{ + ByArgs: true, + }, + } +} diff --git a/backend/app/jobs/media_asset_process.go b/backend/app/jobs/media_asset_process.go new file mode 100644 index 0000000..da8582f --- /dev/null +++ b/backend/app/jobs/media_asset_process.go @@ -0,0 +1,58 @@ +package jobs + +import ( + "context" + "time" + + jobs_args "quyun/v2/app/jobs/args" + "quyun/v2/app/services" + + . "github.com/riverqueue/river" + log "github.com/sirupsen/logrus" +) + +var _ Worker[jobs_args.MediaAssetProcessJob] = (*MediaAssetProcessJobWorker)(nil) + +// MediaAssetProcessJobWorker 负责执行媒体资源处理的异步处理(占位实现)。 +// +// 当前实现为 stub:不对接外部转码/截图服务,仅回写 meta 并将状态置为 ready。 +// +// @provider(job) +type MediaAssetProcessJobWorker struct { + WorkerDefaults[jobs_args.MediaAssetProcessJob] +} + +func (w *MediaAssetProcessJobWorker) Work(ctx context.Context, job *Job[jobs_args.MediaAssetProcessJob]) error { + args := job.Args + + attempt := 0 + if job != nil && job.JobRow != nil { + attempt = job.Attempt + } + + logger := log.WithFields(log.Fields{ + "job_kind": args.Kind(), + "tenant_id": args.TenantID, + "asset_id": args.AssetID, + "attempt": attempt, + }) + + logger.Info("jobs.media_asset_process.start") + + _, err := services.MediaAsset.ProcessSuccess(ctx, args.TenantID, args.AssetID, map[string]any{ + "process_pipeline": "stub", + "worker_attempt": attempt, + "worker_ran_at": time.Now().UTC().Format(time.RFC3339Nano), + }, time.Now().UTC()) + if err != nil { + if services.IsMediaAssetProcessJobNonRetryableError(err) { + logger.WithError(err).Warn("jobs.media_asset_process.cancel") + return JobCancel(err) + } + logger.WithError(err).Warn("jobs.media_asset_process.retry") + return err + } + + logger.Info("jobs.media_asset_process.ok") + return nil +} diff --git a/backend/app/jobs/media_asset_process_test.go b/backend/app/jobs/media_asset_process_test.go new file mode 100644 index 0000000..a880ee5 --- /dev/null +++ b/backend/app/jobs/media_asset_process_test.go @@ -0,0 +1,85 @@ +package jobs + +import ( + "database/sql" + "testing" + "time" + + "quyun/v2/app/commands/testx" + jobs_args "quyun/v2/app/jobs/args" + "quyun/v2/app/services" + "quyun/v2/database" + "quyun/v2/database/models" + "quyun/v2/pkg/consts" + + "github.com/riverqueue/river" + "github.com/riverqueue/river/rivertype" + . "github.com/smartystreets/goconvey/convey" + "github.com/stretchr/testify/suite" + + _ "go.ipao.vip/atom" + "go.ipao.vip/atom/contracts" + "go.ipao.vip/gen/types" + "go.uber.org/dig" +) + +type MediaAssetProcessJobSuiteInjectParams struct { + dig.In + + DB *sql.DB + Initials []contracts.Initial `group:"initials"` // nolint:structcheck +} + +type MediaAssetProcessJobSuite struct { + suite.Suite + MediaAssetProcessJobSuiteInjectParams +} + +func Test_MediaAssetProcessJob(t *testing.T) { + // 注意:testx.Default() 已内置一个测试用的 job worker 注册器,避免和 jobs.Provide 重复注册同 kind worker。 + providers := testx.Default().With(services.Provide) + + testx.Serve(providers, t, func(p MediaAssetProcessJobSuiteInjectParams) { + suite.Run(t, &MediaAssetProcessJobSuite{MediaAssetProcessJobSuiteInjectParams: p}) + }) +} + +func (s *MediaAssetProcessJobSuite) Test_Work_ProcessingToReady() { + Convey("MediaAssetProcessJobWorker processing -> ready", s.T(), func() { + ctx := s.T().Context() + now := time.Now().UTC() + tenantID := int64(1) + userID := int64(2) + + database.Truncate(ctx, s.DB, models.TableNameMediaAsset) + + asset := &models.MediaAsset{ + TenantID: tenantID, + UserID: userID, + Type: consts.MediaAssetTypeVideo, + Status: consts.MediaAssetStatusProcessing, + Provider: "test", + Bucket: "b", + ObjectKey: "k", + Meta: types.JSON([]byte("{}")), + CreatedAt: now, + UpdatedAt: now, + } + So(asset.Create(ctx), ShouldBeNil) + + worker := &MediaAssetProcessJobWorker{} + err := worker.Work(ctx, &river.Job[jobs_args.MediaAssetProcessJob]{ + JobRow: &rivertype.JobRow{Attempt: 1}, + Args: jobs_args.MediaAssetProcessJob{TenantID: tenantID, AssetID: asset.ID}, + }) + So(err, ShouldBeNil) + + tbl, query := models.MediaAssetQuery.QueryContext(ctx) + got, err := query.Where( + tbl.TenantID.Eq(tenantID), + tbl.ID.Eq(asset.ID), + ).First() + So(err, ShouldBeNil) + So(got.Status, ShouldEqual, consts.MediaAssetStatusReady) + }) +} diff --git a/backend/app/jobs/provider.gen.go b/backend/app/jobs/provider.gen.go index 82ee41c..474a4fc 100755 --- a/backend/app/jobs/provider.gen.go +++ b/backend/app/jobs/provider.gen.go @@ -11,6 +11,18 @@ import ( ) func Provide(opts ...opt.Option) error { + if err := container.Container.Provide(func( + __job *job.Job, + ) (contracts.Initial, error) { + obj := &MediaAssetProcessJobWorker{} + if err := river.AddWorkerSafely(__job.Workers, obj); err != nil { + return nil, err + } + + return obj, nil + }, atom.GroupInitial); err != nil { + return err + } if err := container.Container.Provide(func( __job *job.Job, ) (contracts.Initial, error) { diff --git a/backend/app/services/media_asset.go b/backend/app/services/media_asset.go index 1433d0e..b6f7eda 100644 --- a/backend/app/services/media_asset.go +++ b/backend/app/services/media_asset.go @@ -12,9 +12,11 @@ import ( "quyun/v2/app/errorx" tenant_dto "quyun/v2/app/http/tenant/dto" + jobs_args "quyun/v2/app/jobs/args" "quyun/v2/app/requests" "quyun/v2/database/models" "quyun/v2/pkg/consts" + provider_job "quyun/v2/providers/job" pkgerrors "github.com/pkg/errors" "github.com/samber/lo" @@ -29,7 +31,30 @@ import ( // mediaAsset 提供媒体资源上传初始化等能力(上传/处理链路会在后续里程碑补齐)。 // // @provider -type mediaAsset struct{} +type mediaAsset struct { + job *provider_job.Job +} + +func IsMediaAssetProcessJobNonRetryableError(err error) bool { + var appErr *errorx.AppError + if !errors.As(err, &appErr) { + return false + } + switch appErr.Code { + case errorx.CodeInvalidParameter, + errorx.CodeRecordNotFound, + errorx.CodeStatusConflict, + errorx.CodePreconditionFailed, + errorx.CodePermissionDenied: + return true + default: + return false + } +} + +func (s *mediaAsset) enqueueMediaAssetProcessJob(args jobs_args.MediaAssetProcessJob) error { + return s.job.Add(args) +} func mediaAssetTransitionAllowed(from, to consts.MediaAssetStatus) bool { switch from { @@ -218,7 +243,11 @@ func (s *mediaAsset) AdminUploadComplete( "asset_id": assetID, }).Info("services.media_asset.admin.upload_complete") - var out models.MediaAsset + var ( + out models.MediaAsset + needEnqueue bool + enqueueArgs jobs_args.MediaAssetProcessJob + ) err := models.Q.Transaction(func(tx *models.Query) error { tbl, query := tx.MediaAsset.QueryContext(ctx) @@ -241,11 +270,16 @@ func (s *mediaAsset) AdminUploadComplete( return errorx.ErrPreconditionFailed.WithMsg("media asset deleted") } - // 幂等:重复 upload_complete 时返回现态。 + // 幂等:重复 upload_complete 时返回现态;但只要处于 processing,就允许再次触发入队(用于“上次入队失败”的补偿重试)。 switch m.Status { - case consts.MediaAssetStatusProcessing, consts.MediaAssetStatusReady, consts.MediaAssetStatusFailed: + case consts.MediaAssetStatusReady, consts.MediaAssetStatusFailed: out = *m return nil + case consts.MediaAssetStatusProcessing: + out = *m + needEnqueue = true + enqueueArgs = jobs_args.MediaAssetProcessJob{TenantID: tenantID, AssetID: assetID} + return nil case consts.MediaAssetStatusUploaded: // allowed default: @@ -294,19 +328,32 @@ func (s *mediaAsset) AdminUploadComplete( m.UpdatedAt = now out = *m - // 触发异步处理(当前为 stub):后续接入队列/任务系统时在此处落任务并保持幂等。 - logrus.WithFields(logrus.Fields{ - "tenant_id": tenantID, - "user_id": operatorUserID, - "asset_id": assetID, - "status": m.Status, - }).Info("services.media_asset.process.triggered") + needEnqueue = true + enqueueArgs = jobs_args.MediaAssetProcessJob{TenantID: tenantID, AssetID: assetID} return nil }) if err != nil { return nil, err } + + if needEnqueue { + // 注意:River 的唯一约束会将重复入队“软跳过”,因此这里允许多次触发以补偿偶发入队失败。 + if err := s.enqueueMediaAssetProcessJob(enqueueArgs); err != nil { + logrus.WithFields(logrus.Fields{ + "tenant_id": tenantID, + "user_id": operatorUserID, + "asset_id": assetID, + }).WithError(err).Warn("services.media_asset.process.enqueue_failed") + return nil, err + } + logrus.WithFields(logrus.Fields{ + "tenant_id": tenantID, + "user_id": operatorUserID, + "asset_id": assetID, + }).Info("services.media_asset.process.enqueued") + } + return &out, nil } diff --git a/backend/app/services/media_asset_test.go b/backend/app/services/media_asset_test.go index 94511f8..666d247 100644 --- a/backend/app/services/media_asset_test.go +++ b/backend/app/services/media_asset_test.go @@ -9,6 +9,7 @@ import ( "quyun/v2/app/commands/testx" "quyun/v2/app/errorx" tenant_dto "quyun/v2/app/http/tenant/dto" + jobs_args "quyun/v2/app/jobs/args" "quyun/v2/database" "quyun/v2/database/models" "quyun/v2/pkg/consts" @@ -109,3 +110,49 @@ func (s *MediaAssetTestSuite) Test_AdminUploadInit_VariantAndSource() { }) }) } + +func (s *MediaAssetTestSuite) Test_AdminUploadComplete_EnqueueAndProcess() { + Convey("MediaAsset.AdminUploadComplete enqueue job and worker process", s.T(), func() { + ctx := s.T().Context() + now := time.Now().UTC() + tenantID := int64(1) + userID := int64(2) + + database.Truncate(ctx, s.DB, "river_job", models.TableNameMediaAsset) + + asset := &models.MediaAsset{ + TenantID: tenantID, + UserID: userID, + Type: consts.MediaAssetTypeVideo, + Status: consts.MediaAssetStatusUploaded, + Provider: "test", + Bucket: "b", + ObjectKey: "k", + Meta: []byte("{}"), + CreatedAt: now, + UpdatedAt: now, + } + So(asset.Create(ctx), ShouldBeNil) + + Convey("首次 upload_complete:uploaded -> processing,并入队一次", func() { + out, err := MediaAsset.AdminUploadComplete(ctx, tenantID, userID, asset.ID, nil, now) + So(err, ShouldBeNil) + So(out.Status, ShouldEqual, consts.MediaAssetStatusProcessing) + + var cnt int + err = s.DB.QueryRowContext(ctx, "SELECT COUNT(1) FROM river_job WHERE kind = $1", jobs_args.MediaAssetProcessJob{}.Kind()).Scan(&cnt) + So(err, ShouldBeNil) + So(cnt, ShouldEqual, 1) + + Convey("重复 upload_complete:仍可触发入队,但不会产生重复任务", func() { + out2, err := MediaAsset.AdminUploadComplete(ctx, tenantID, userID, asset.ID, nil, now.Add(1*time.Second)) + So(err, ShouldBeNil) + So(out2.Status, ShouldEqual, consts.MediaAssetStatusProcessing) + + err = s.DB.QueryRowContext(ctx, "SELECT COUNT(1) FROM river_job WHERE kind = $1", jobs_args.MediaAssetProcessJob{}.Kind()).Scan(&cnt) + So(err, ShouldBeNil) + So(cnt, ShouldEqual, 1) + }) + }) + }) +} diff --git a/backend/app/services/provider.gen.go b/backend/app/services/provider.gen.go index 2f2214e..5f92244 100755 --- a/backend/app/services/provider.gen.go +++ b/backend/app/services/provider.gen.go @@ -30,8 +30,12 @@ func Provide(opts ...opt.Option) error { }); err != nil { return err } - if err := container.Container.Provide(func() (*mediaAsset, error) { - obj := &mediaAsset{} + if err := container.Container.Provide(func( + job *provider_job.Job, + ) (*mediaAsset, error) { + obj := &mediaAsset{ + job: job, + } return obj, nil }); err != nil {