feat: 添加媒体资源处理的异步任务及相关逻辑
This commit is contained in:
@@ -39,6 +39,14 @@ func (w *orderRefundTestWorker) Work(ctx context.Context, job *river.Job[jobs_ar
|
|||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
type mediaAssetProcessTestWorker struct {
|
||||||
|
river.WorkerDefaults[jobs_args.MediaAssetProcessJob]
|
||||||
|
}
|
||||||
|
|
||||||
|
func (w *mediaAssetProcessTestWorker) Work(ctx context.Context, job *river.Job[jobs_args.MediaAssetProcessJob]) error {
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
func testJobWorkersProvider() container.ProviderContainer {
|
func testJobWorkersProvider() container.ProviderContainer {
|
||||||
return container.ProviderContainer{
|
return container.ProviderContainer{
|
||||||
Provider: func(opts ...opt.Option) error {
|
Provider: func(opts ...opt.Option) error {
|
||||||
@@ -47,6 +55,11 @@ func testJobWorkersProvider() container.ProviderContainer {
|
|||||||
if err := river.AddWorkerSafely(__job.Workers, obj); err != nil {
|
if err := river.AddWorkerSafely(__job.Workers, obj); err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
|
|
||||||
|
obj2 := &mediaAssetProcessTestWorker{}
|
||||||
|
if err := river.AddWorkerSafely(__job.Workers, obj2); err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
return obj, nil
|
return obj, nil
|
||||||
}, atom.GroupInitial)
|
}, atom.GroupInitial)
|
||||||
},
|
},
|
||||||
|
|||||||
38
backend/app/jobs/args/media_asset_process.go
Normal file
38
backend/app/jobs/args/media_asset_process.go
Normal file
@@ -0,0 +1,38 @@
|
|||||||
|
package args
|
||||||
|
|
||||||
|
import (
|
||||||
|
"quyun/v2/providers/job"
|
||||||
|
|
||||||
|
. "github.com/riverqueue/river"
|
||||||
|
"go.ipao.vip/atom/contracts"
|
||||||
|
)
|
||||||
|
|
||||||
|
var _ contracts.JobArgs = MediaAssetProcessJob{}
|
||||||
|
|
||||||
|
// MediaAssetProcessJob 表示“媒体资源处理”的一次性异步任务。
|
||||||
|
//
|
||||||
|
// 设计说明:
|
||||||
|
// - 该任务用于将 media_assets 从 processing 推进到 ready/failed(由 worker 执行核心处理逻辑)。
|
||||||
|
// - 幂等由 River Unique + 业务侧 DB 状态机共同保证(重复入队会被跳过或无副作用)。
|
||||||
|
type MediaAssetProcessJob struct {
|
||||||
|
// TenantID 租户ID(用于多租户隔离与唯一性维度)。
|
||||||
|
TenantID int64 `json:"tenant_id" river:"unique"`
|
||||||
|
// AssetID 媒体资源ID(用于唯一性维度)。
|
||||||
|
AssetID int64 `json:"asset_id" river:"unique"`
|
||||||
|
}
|
||||||
|
|
||||||
|
func (MediaAssetProcessJob) Kind() string { return "media_asset_process" }
|
||||||
|
|
||||||
|
func (a MediaAssetProcessJob) UniqueID() string { return a.Kind() }
|
||||||
|
|
||||||
|
func (MediaAssetProcessJob) InsertOpts() InsertOpts {
|
||||||
|
return InsertOpts{
|
||||||
|
Queue: job.QueueDefault,
|
||||||
|
Priority: job.PriorityDefault,
|
||||||
|
// 失败可重试;由 worker 判断不可重试的场景并 JobCancel。
|
||||||
|
MaxAttempts: 10,
|
||||||
|
UniqueOpts: UniqueOpts{
|
||||||
|
ByArgs: true,
|
||||||
|
},
|
||||||
|
}
|
||||||
|
}
|
||||||
58
backend/app/jobs/media_asset_process.go
Normal file
58
backend/app/jobs/media_asset_process.go
Normal file
@@ -0,0 +1,58 @@
|
|||||||
|
package jobs
|
||||||
|
|
||||||
|
import (
|
||||||
|
"context"
|
||||||
|
"time"
|
||||||
|
|
||||||
|
jobs_args "quyun/v2/app/jobs/args"
|
||||||
|
"quyun/v2/app/services"
|
||||||
|
|
||||||
|
. "github.com/riverqueue/river"
|
||||||
|
log "github.com/sirupsen/logrus"
|
||||||
|
)
|
||||||
|
|
||||||
|
var _ Worker[jobs_args.MediaAssetProcessJob] = (*MediaAssetProcessJobWorker)(nil)
|
||||||
|
|
||||||
|
// MediaAssetProcessJobWorker 负责执行媒体资源处理的异步处理(占位实现)。
|
||||||
|
//
|
||||||
|
// 当前实现为 stub:不对接外部转码/截图服务,仅回写 meta 并将状态置为 ready。
|
||||||
|
//
|
||||||
|
// @provider(job)
|
||||||
|
type MediaAssetProcessJobWorker struct {
|
||||||
|
WorkerDefaults[jobs_args.MediaAssetProcessJob]
|
||||||
|
}
|
||||||
|
|
||||||
|
func (w *MediaAssetProcessJobWorker) Work(ctx context.Context, job *Job[jobs_args.MediaAssetProcessJob]) error {
|
||||||
|
args := job.Args
|
||||||
|
|
||||||
|
attempt := 0
|
||||||
|
if job != nil && job.JobRow != nil {
|
||||||
|
attempt = job.Attempt
|
||||||
|
}
|
||||||
|
|
||||||
|
logger := log.WithFields(log.Fields{
|
||||||
|
"job_kind": args.Kind(),
|
||||||
|
"tenant_id": args.TenantID,
|
||||||
|
"asset_id": args.AssetID,
|
||||||
|
"attempt": attempt,
|
||||||
|
})
|
||||||
|
|
||||||
|
logger.Info("jobs.media_asset_process.start")
|
||||||
|
|
||||||
|
_, err := services.MediaAsset.ProcessSuccess(ctx, args.TenantID, args.AssetID, map[string]any{
|
||||||
|
"process_pipeline": "stub",
|
||||||
|
"worker_attempt": attempt,
|
||||||
|
"worker_ran_at": time.Now().UTC().Format(time.RFC3339Nano),
|
||||||
|
}, time.Now().UTC())
|
||||||
|
if err != nil {
|
||||||
|
if services.IsMediaAssetProcessJobNonRetryableError(err) {
|
||||||
|
logger.WithError(err).Warn("jobs.media_asset_process.cancel")
|
||||||
|
return JobCancel(err)
|
||||||
|
}
|
||||||
|
logger.WithError(err).Warn("jobs.media_asset_process.retry")
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
logger.Info("jobs.media_asset_process.ok")
|
||||||
|
return nil
|
||||||
|
}
|
||||||
85
backend/app/jobs/media_asset_process_test.go
Normal file
85
backend/app/jobs/media_asset_process_test.go
Normal file
@@ -0,0 +1,85 @@
|
|||||||
|
package jobs
|
||||||
|
|
||||||
|
import (
|
||||||
|
"database/sql"
|
||||||
|
"testing"
|
||||||
|
"time"
|
||||||
|
|
||||||
|
"quyun/v2/app/commands/testx"
|
||||||
|
jobs_args "quyun/v2/app/jobs/args"
|
||||||
|
"quyun/v2/app/services"
|
||||||
|
"quyun/v2/database"
|
||||||
|
"quyun/v2/database/models"
|
||||||
|
"quyun/v2/pkg/consts"
|
||||||
|
|
||||||
|
"github.com/riverqueue/river"
|
||||||
|
"github.com/riverqueue/river/rivertype"
|
||||||
|
. "github.com/smartystreets/goconvey/convey"
|
||||||
|
"github.com/stretchr/testify/suite"
|
||||||
|
|
||||||
|
_ "go.ipao.vip/atom"
|
||||||
|
"go.ipao.vip/atom/contracts"
|
||||||
|
"go.ipao.vip/gen/types"
|
||||||
|
"go.uber.org/dig"
|
||||||
|
)
|
||||||
|
|
||||||
|
type MediaAssetProcessJobSuiteInjectParams struct {
|
||||||
|
dig.In
|
||||||
|
|
||||||
|
DB *sql.DB
|
||||||
|
Initials []contracts.Initial `group:"initials"` // nolint:structcheck
|
||||||
|
}
|
||||||
|
|
||||||
|
type MediaAssetProcessJobSuite struct {
|
||||||
|
suite.Suite
|
||||||
|
MediaAssetProcessJobSuiteInjectParams
|
||||||
|
}
|
||||||
|
|
||||||
|
func Test_MediaAssetProcessJob(t *testing.T) {
|
||||||
|
// 注意:testx.Default() 已内置一个测试用的 job worker 注册器,避免和 jobs.Provide 重复注册同 kind worker。
|
||||||
|
providers := testx.Default().With(services.Provide)
|
||||||
|
|
||||||
|
testx.Serve(providers, t, func(p MediaAssetProcessJobSuiteInjectParams) {
|
||||||
|
suite.Run(t, &MediaAssetProcessJobSuite{MediaAssetProcessJobSuiteInjectParams: p})
|
||||||
|
})
|
||||||
|
}
|
||||||
|
|
||||||
|
func (s *MediaAssetProcessJobSuite) Test_Work_ProcessingToReady() {
|
||||||
|
Convey("MediaAssetProcessJobWorker processing -> ready", s.T(), func() {
|
||||||
|
ctx := s.T().Context()
|
||||||
|
now := time.Now().UTC()
|
||||||
|
tenantID := int64(1)
|
||||||
|
userID := int64(2)
|
||||||
|
|
||||||
|
database.Truncate(ctx, s.DB, models.TableNameMediaAsset)
|
||||||
|
|
||||||
|
asset := &models.MediaAsset{
|
||||||
|
TenantID: tenantID,
|
||||||
|
UserID: userID,
|
||||||
|
Type: consts.MediaAssetTypeVideo,
|
||||||
|
Status: consts.MediaAssetStatusProcessing,
|
||||||
|
Provider: "test",
|
||||||
|
Bucket: "b",
|
||||||
|
ObjectKey: "k",
|
||||||
|
Meta: types.JSON([]byte("{}")),
|
||||||
|
CreatedAt: now,
|
||||||
|
UpdatedAt: now,
|
||||||
|
}
|
||||||
|
So(asset.Create(ctx), ShouldBeNil)
|
||||||
|
|
||||||
|
worker := &MediaAssetProcessJobWorker{}
|
||||||
|
err := worker.Work(ctx, &river.Job[jobs_args.MediaAssetProcessJob]{
|
||||||
|
JobRow: &rivertype.JobRow{Attempt: 1},
|
||||||
|
Args: jobs_args.MediaAssetProcessJob{TenantID: tenantID, AssetID: asset.ID},
|
||||||
|
})
|
||||||
|
So(err, ShouldBeNil)
|
||||||
|
|
||||||
|
tbl, query := models.MediaAssetQuery.QueryContext(ctx)
|
||||||
|
got, err := query.Where(
|
||||||
|
tbl.TenantID.Eq(tenantID),
|
||||||
|
tbl.ID.Eq(asset.ID),
|
||||||
|
).First()
|
||||||
|
So(err, ShouldBeNil)
|
||||||
|
So(got.Status, ShouldEqual, consts.MediaAssetStatusReady)
|
||||||
|
})
|
||||||
|
}
|
||||||
@@ -11,6 +11,18 @@ import (
|
|||||||
)
|
)
|
||||||
|
|
||||||
func Provide(opts ...opt.Option) error {
|
func Provide(opts ...opt.Option) error {
|
||||||
|
if err := container.Container.Provide(func(
|
||||||
|
__job *job.Job,
|
||||||
|
) (contracts.Initial, error) {
|
||||||
|
obj := &MediaAssetProcessJobWorker{}
|
||||||
|
if err := river.AddWorkerSafely(__job.Workers, obj); err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
|
||||||
|
return obj, nil
|
||||||
|
}, atom.GroupInitial); err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
if err := container.Container.Provide(func(
|
if err := container.Container.Provide(func(
|
||||||
__job *job.Job,
|
__job *job.Job,
|
||||||
) (contracts.Initial, error) {
|
) (contracts.Initial, error) {
|
||||||
|
|||||||
@@ -12,9 +12,11 @@ import (
|
|||||||
|
|
||||||
"quyun/v2/app/errorx"
|
"quyun/v2/app/errorx"
|
||||||
tenant_dto "quyun/v2/app/http/tenant/dto"
|
tenant_dto "quyun/v2/app/http/tenant/dto"
|
||||||
|
jobs_args "quyun/v2/app/jobs/args"
|
||||||
"quyun/v2/app/requests"
|
"quyun/v2/app/requests"
|
||||||
"quyun/v2/database/models"
|
"quyun/v2/database/models"
|
||||||
"quyun/v2/pkg/consts"
|
"quyun/v2/pkg/consts"
|
||||||
|
provider_job "quyun/v2/providers/job"
|
||||||
|
|
||||||
pkgerrors "github.com/pkg/errors"
|
pkgerrors "github.com/pkg/errors"
|
||||||
"github.com/samber/lo"
|
"github.com/samber/lo"
|
||||||
@@ -29,7 +31,30 @@ import (
|
|||||||
// mediaAsset 提供媒体资源上传初始化等能力(上传/处理链路会在后续里程碑补齐)。
|
// mediaAsset 提供媒体资源上传初始化等能力(上传/处理链路会在后续里程碑补齐)。
|
||||||
//
|
//
|
||||||
// @provider
|
// @provider
|
||||||
type mediaAsset struct{}
|
type mediaAsset struct {
|
||||||
|
job *provider_job.Job
|
||||||
|
}
|
||||||
|
|
||||||
|
func IsMediaAssetProcessJobNonRetryableError(err error) bool {
|
||||||
|
var appErr *errorx.AppError
|
||||||
|
if !errors.As(err, &appErr) {
|
||||||
|
return false
|
||||||
|
}
|
||||||
|
switch appErr.Code {
|
||||||
|
case errorx.CodeInvalidParameter,
|
||||||
|
errorx.CodeRecordNotFound,
|
||||||
|
errorx.CodeStatusConflict,
|
||||||
|
errorx.CodePreconditionFailed,
|
||||||
|
errorx.CodePermissionDenied:
|
||||||
|
return true
|
||||||
|
default:
|
||||||
|
return false
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func (s *mediaAsset) enqueueMediaAssetProcessJob(args jobs_args.MediaAssetProcessJob) error {
|
||||||
|
return s.job.Add(args)
|
||||||
|
}
|
||||||
|
|
||||||
func mediaAssetTransitionAllowed(from, to consts.MediaAssetStatus) bool {
|
func mediaAssetTransitionAllowed(from, to consts.MediaAssetStatus) bool {
|
||||||
switch from {
|
switch from {
|
||||||
@@ -218,7 +243,11 @@ func (s *mediaAsset) AdminUploadComplete(
|
|||||||
"asset_id": assetID,
|
"asset_id": assetID,
|
||||||
}).Info("services.media_asset.admin.upload_complete")
|
}).Info("services.media_asset.admin.upload_complete")
|
||||||
|
|
||||||
var out models.MediaAsset
|
var (
|
||||||
|
out models.MediaAsset
|
||||||
|
needEnqueue bool
|
||||||
|
enqueueArgs jobs_args.MediaAssetProcessJob
|
||||||
|
)
|
||||||
|
|
||||||
err := models.Q.Transaction(func(tx *models.Query) error {
|
err := models.Q.Transaction(func(tx *models.Query) error {
|
||||||
tbl, query := tx.MediaAsset.QueryContext(ctx)
|
tbl, query := tx.MediaAsset.QueryContext(ctx)
|
||||||
@@ -241,11 +270,16 @@ func (s *mediaAsset) AdminUploadComplete(
|
|||||||
return errorx.ErrPreconditionFailed.WithMsg("media asset deleted")
|
return errorx.ErrPreconditionFailed.WithMsg("media asset deleted")
|
||||||
}
|
}
|
||||||
|
|
||||||
// 幂等:重复 upload_complete 时返回现态。
|
// 幂等:重复 upload_complete 时返回现态;但只要处于 processing,就允许再次触发入队(用于“上次入队失败”的补偿重试)。
|
||||||
switch m.Status {
|
switch m.Status {
|
||||||
case consts.MediaAssetStatusProcessing, consts.MediaAssetStatusReady, consts.MediaAssetStatusFailed:
|
case consts.MediaAssetStatusReady, consts.MediaAssetStatusFailed:
|
||||||
out = *m
|
out = *m
|
||||||
return nil
|
return nil
|
||||||
|
case consts.MediaAssetStatusProcessing:
|
||||||
|
out = *m
|
||||||
|
needEnqueue = true
|
||||||
|
enqueueArgs = jobs_args.MediaAssetProcessJob{TenantID: tenantID, AssetID: assetID}
|
||||||
|
return nil
|
||||||
case consts.MediaAssetStatusUploaded:
|
case consts.MediaAssetStatusUploaded:
|
||||||
// allowed
|
// allowed
|
||||||
default:
|
default:
|
||||||
@@ -294,19 +328,32 @@ func (s *mediaAsset) AdminUploadComplete(
|
|||||||
m.UpdatedAt = now
|
m.UpdatedAt = now
|
||||||
out = *m
|
out = *m
|
||||||
|
|
||||||
// 触发异步处理(当前为 stub):后续接入队列/任务系统时在此处落任务并保持幂等。
|
needEnqueue = true
|
||||||
logrus.WithFields(logrus.Fields{
|
enqueueArgs = jobs_args.MediaAssetProcessJob{TenantID: tenantID, AssetID: assetID}
|
||||||
"tenant_id": tenantID,
|
|
||||||
"user_id": operatorUserID,
|
|
||||||
"asset_id": assetID,
|
|
||||||
"status": m.Status,
|
|
||||||
}).Info("services.media_asset.process.triggered")
|
|
||||||
|
|
||||||
return nil
|
return nil
|
||||||
})
|
})
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
|
|
||||||
|
if needEnqueue {
|
||||||
|
// 注意:River 的唯一约束会将重复入队“软跳过”,因此这里允许多次触发以补偿偶发入队失败。
|
||||||
|
if err := s.enqueueMediaAssetProcessJob(enqueueArgs); err != nil {
|
||||||
|
logrus.WithFields(logrus.Fields{
|
||||||
|
"tenant_id": tenantID,
|
||||||
|
"user_id": operatorUserID,
|
||||||
|
"asset_id": assetID,
|
||||||
|
}).WithError(err).Warn("services.media_asset.process.enqueue_failed")
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
logrus.WithFields(logrus.Fields{
|
||||||
|
"tenant_id": tenantID,
|
||||||
|
"user_id": operatorUserID,
|
||||||
|
"asset_id": assetID,
|
||||||
|
}).Info("services.media_asset.process.enqueued")
|
||||||
|
}
|
||||||
|
|
||||||
return &out, nil
|
return &out, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|||||||
@@ -9,6 +9,7 @@ import (
|
|||||||
"quyun/v2/app/commands/testx"
|
"quyun/v2/app/commands/testx"
|
||||||
"quyun/v2/app/errorx"
|
"quyun/v2/app/errorx"
|
||||||
tenant_dto "quyun/v2/app/http/tenant/dto"
|
tenant_dto "quyun/v2/app/http/tenant/dto"
|
||||||
|
jobs_args "quyun/v2/app/jobs/args"
|
||||||
"quyun/v2/database"
|
"quyun/v2/database"
|
||||||
"quyun/v2/database/models"
|
"quyun/v2/database/models"
|
||||||
"quyun/v2/pkg/consts"
|
"quyun/v2/pkg/consts"
|
||||||
@@ -109,3 +110,49 @@ func (s *MediaAssetTestSuite) Test_AdminUploadInit_VariantAndSource() {
|
|||||||
})
|
})
|
||||||
})
|
})
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (s *MediaAssetTestSuite) Test_AdminUploadComplete_EnqueueAndProcess() {
|
||||||
|
Convey("MediaAsset.AdminUploadComplete enqueue job and worker process", s.T(), func() {
|
||||||
|
ctx := s.T().Context()
|
||||||
|
now := time.Now().UTC()
|
||||||
|
tenantID := int64(1)
|
||||||
|
userID := int64(2)
|
||||||
|
|
||||||
|
database.Truncate(ctx, s.DB, "river_job", models.TableNameMediaAsset)
|
||||||
|
|
||||||
|
asset := &models.MediaAsset{
|
||||||
|
TenantID: tenantID,
|
||||||
|
UserID: userID,
|
||||||
|
Type: consts.MediaAssetTypeVideo,
|
||||||
|
Status: consts.MediaAssetStatusUploaded,
|
||||||
|
Provider: "test",
|
||||||
|
Bucket: "b",
|
||||||
|
ObjectKey: "k",
|
||||||
|
Meta: []byte("{}"),
|
||||||
|
CreatedAt: now,
|
||||||
|
UpdatedAt: now,
|
||||||
|
}
|
||||||
|
So(asset.Create(ctx), ShouldBeNil)
|
||||||
|
|
||||||
|
Convey("首次 upload_complete:uploaded -> processing,并入队一次", func() {
|
||||||
|
out, err := MediaAsset.AdminUploadComplete(ctx, tenantID, userID, asset.ID, nil, now)
|
||||||
|
So(err, ShouldBeNil)
|
||||||
|
So(out.Status, ShouldEqual, consts.MediaAssetStatusProcessing)
|
||||||
|
|
||||||
|
var cnt int
|
||||||
|
err = s.DB.QueryRowContext(ctx, "SELECT COUNT(1) FROM river_job WHERE kind = $1", jobs_args.MediaAssetProcessJob{}.Kind()).Scan(&cnt)
|
||||||
|
So(err, ShouldBeNil)
|
||||||
|
So(cnt, ShouldEqual, 1)
|
||||||
|
|
||||||
|
Convey("重复 upload_complete:仍可触发入队,但不会产生重复任务", func() {
|
||||||
|
out2, err := MediaAsset.AdminUploadComplete(ctx, tenantID, userID, asset.ID, nil, now.Add(1*time.Second))
|
||||||
|
So(err, ShouldBeNil)
|
||||||
|
So(out2.Status, ShouldEqual, consts.MediaAssetStatusProcessing)
|
||||||
|
|
||||||
|
err = s.DB.QueryRowContext(ctx, "SELECT COUNT(1) FROM river_job WHERE kind = $1", jobs_args.MediaAssetProcessJob{}.Kind()).Scan(&cnt)
|
||||||
|
So(err, ShouldBeNil)
|
||||||
|
So(cnt, ShouldEqual, 1)
|
||||||
|
})
|
||||||
|
})
|
||||||
|
})
|
||||||
|
}
|
||||||
|
|||||||
@@ -30,8 +30,12 @@ func Provide(opts ...opt.Option) error {
|
|||||||
}); err != nil {
|
}); err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
if err := container.Container.Provide(func() (*mediaAsset, error) {
|
if err := container.Container.Provide(func(
|
||||||
obj := &mediaAsset{}
|
job *provider_job.Job,
|
||||||
|
) (*mediaAsset, error) {
|
||||||
|
obj := &mediaAsset{
|
||||||
|
job: job,
|
||||||
|
}
|
||||||
|
|
||||||
return obj, nil
|
return obj, nil
|
||||||
}); err != nil {
|
}); err != nil {
|
||||||
|
|||||||
Reference in New Issue
Block a user