diff --git a/TODO.md b/TODO.md index 4a735d9..5a347c3 100644 --- a/TODO.md +++ b/TODO.md @@ -7,8 +7,8 @@ - [x] Migrated notification dispatch to use River. ## 2. Media & Infra -- [ ] **Real Storage**: Implement S3/MinIO provider for presigned URLs. -- [ ] **Media Pipeline**: Implement FFmpeg integration (mock call in job). +- [ ] **Real Storage (LAST)**: S3/MinIO integration test to be done at final stage (current dev uses local FS). +- [x] **Media Pipeline**: Implement FFmpeg integration (mock call in job). ## 3. Growth - [ ] **Coupons**: Implement coupon logic. diff --git a/backend/app/jobs/media_process_job.go b/backend/app/jobs/media_process_job.go index 63ac223..017046b 100644 --- a/backend/app/jobs/media_process_job.go +++ b/backend/app/jobs/media_process_job.go @@ -2,8 +2,11 @@ package jobs import ( "context" + "errors" + "os" "os/exec" "path/filepath" + "strings" "time" "quyun/v2/app/jobs/args" @@ -13,74 +16,98 @@ import ( "github.com/riverqueue/river" log "github.com/sirupsen/logrus" + "gorm.io/gorm" ) // @provider(job) type MediaProcessWorker struct { - river.WorkerDefaults[args.MediaProcessArgs] + river.WorkerDefaults[args.MediaAssetProcessJob] storage *storage.Storage } -func (j *MediaProcessWorker) Work(ctx context.Context, job *river.Job[args.MediaProcessArgs]) error { +func (j *MediaProcessWorker) Work(ctx context.Context, job *river.Job[args.MediaAssetProcessJob]) error { arg := job.Args - // 1. Fetch Asset - asset, err := models.MediaAssetQuery.WithContext(ctx).Where(models.MediaAssetQuery.ID.Eq(arg.AssetID)).First() + // 1. 获取媒体资源,保证租户隔离。 + tbl, q := models.MediaAssetQuery.QueryContext(ctx) + q = q.Where(tbl.ID.Eq(arg.AssetID)) + if arg.TenantID > 0 { + q = q.Where(tbl.TenantID.Eq(arg.TenantID)) + } + asset, err := q.First() if err != nil { + if errors.Is(err, gorm.ErrRecordNotFound) { + log.Warnf("media asset not found: %d", arg.AssetID) + return river.JobCancel(err) + } return err } - // 2. Update status to processing + if asset.Status == consts.MediaAssetStatusReady || asset.Status == consts.MediaAssetStatusDeleted { + return nil + } + + // 2. 更新状态为处理中,标识处理已开始。 _, err = models.MediaAssetQuery.WithContext(ctx). Where(models.MediaAssetQuery.ID.Eq(asset.ID)). - UpdateSimple(models.MediaAssetQuery.Status.Value(consts.MediaAssetStatusProcessing)) + UpdateSimple( + models.MediaAssetQuery.Status.Value(consts.MediaAssetStatusProcessing), + models.MediaAssetQuery.UpdatedAt.Value(time.Now()), + ) if err != nil { return err } - // 3. Process Video (FFmpeg) + finalStatus := consts.MediaAssetStatusReady + + // 3. 处理视频(FFmpeg,未安装时走模拟流程)。 if asset.Type == consts.MediaAssetTypeVideo { - if _, err := exec.LookPath("ffmpeg"); err == nil { + if strings.ToLower(asset.Provider) == "local" { localPath := j.storage.Config.LocalPath if localPath == "" { localPath = "./storage" } inputFile := filepath.Join(localPath, asset.ObjectKey) - outputDir := filepath.Dir(inputFile) - // Simple transcoding: convert to MP4 (mocking complex HLS for simplicity) - // Or just extract cover - coverKey := asset.ObjectKey + ".jpg" - coverFile := filepath.Join(outputDir, filepath.Base(coverKey)) - - // Generate Cover - cmd := exec.CommandContext( - ctx, - "ffmpeg", - "-y", - "-i", - inputFile, - "-ss", - "00:00:01.000", - "-vframes", - "1", - coverFile, - ) - if out, err := cmd.CombinedOutput(); err != nil { - log.Errorf("ffmpeg failed: %s, output: %s", err, string(out)) - // Don't fail the job, just skip cover + if _, err := os.Stat(inputFile); err != nil { + log.Errorf("media file missing: %s, err=%v", inputFile, err) + finalStatus = consts.MediaAssetStatusFailed + } else if _, err := exec.LookPath("ffmpeg"); err != nil { + log.Warn("ffmpeg not found, skipping real transcoding") } else { - log.Infof("Generated cover: %s", coverFile) - // TODO: Create MediaAsset for cover? Or update meta? + outputDir := filepath.Dir(inputFile) + coverKey := asset.ObjectKey + ".jpg" + coverFile := filepath.Join(outputDir, filepath.Base(coverKey)) + + // 生成封面,作为后续管线的占位输出。 + cmd := exec.CommandContext( + ctx, + "ffmpeg", + "-y", + "-i", + inputFile, + "-ss", + "00:00:01.000", + "-vframes", + "1", + coverFile, + ) + if out, err := cmd.CombinedOutput(); err != nil { + log.Errorf("ffmpeg failed: %s, output: %s", err, string(out)) + finalStatus = consts.MediaAssetStatusFailed + } else { + log.Infof("Generated cover: %s", coverFile) + // TODO: 生成封面资产或写入 meta,由后续管线接管。 + } } } else { - log.Warn("ffmpeg not found, skipping real transcoding") + log.Warn("non-local provider, skipping ffmpeg processing") } } - // 4. Update status to ready + // 4. 更新最终状态。 _, err = models.MediaAssetQuery.WithContext(ctx). Where(models.MediaAssetQuery.ID.Eq(asset.ID)). Updates(&models.MediaAsset{ - Status: consts.MediaAssetStatusReady, + Status: finalStatus, UpdatedAt: time.Now(), }) return err diff --git a/backend/app/services/common.go b/backend/app/services/common.go index c86141d..c653ada 100644 --- a/backend/app/services/common.go +++ b/backend/app/services/common.go @@ -18,10 +18,12 @@ import ( "quyun/v2/app/errorx" common_dto "quyun/v2/app/http/v1/dto" + "quyun/v2/app/jobs/args" "quyun/v2/app/requests" "quyun/v2/database/fields" "quyun/v2/database/models" "quyun/v2/pkg/consts" + "quyun/v2/providers/job" "quyun/v2/providers/storage" "github.com/google/uuid" @@ -33,6 +35,7 @@ import ( // @provider type common struct { storage *storage.Storage + job *job.Job } func (s *common) Options(ctx context.Context) (*common_dto.OptionsResponse, error) { @@ -88,7 +91,7 @@ func (s *common) CheckHash(ctx context.Context, tenantID, userID int64, hash str TenantID: tid, UserID: userID, Type: existing.Type, - Status: consts.MediaAssetStatusUploaded, + Status: existing.Status, Provider: existing.Provider, Bucket: existing.Bucket, ObjectKey: existing.ObjectKey, @@ -99,6 +102,9 @@ func (s *common) CheckHash(ctx context.Context, tenantID, userID int64, hash str if err := models.MediaAssetQuery.WithContext(ctx).Create(asset); err != nil { return nil, errorx.ErrDatabaseError.WithCause(err) } + if err := s.enqueueMediaProcess(ctx, tid, asset); err != nil { + return nil, err + } return s.composeUploadResult(asset), nil } @@ -341,7 +347,7 @@ func (s *common) CompleteUpload(ctx context.Context, tenantID, userID int64, for TenantID: tid, UserID: userID, Type: consts.MediaAssetType(meta.Type), - Status: consts.MediaAssetStatusUploaded, + Status: existing.Status, Provider: existing.Provider, Bucket: existing.Bucket, ObjectKey: existing.ObjectKey, @@ -360,7 +366,7 @@ func (s *common) CompleteUpload(ctx context.Context, tenantID, userID int64, for TenantID: tid, UserID: userID, Type: consts.MediaAssetType(meta.Type), - Status: consts.MediaAssetStatusUploaded, + Status: s.initialMediaStatus(consts.MediaAssetType(meta.Type)), Provider: s.storage.Provider(), Bucket: s.storage.Bucket(), ObjectKey: objectKey, @@ -376,6 +382,9 @@ func (s *common) CompleteUpload(ctx context.Context, tenantID, userID int64, for if err := models.MediaAssetQuery.WithContext(ctx).Create(asset); err != nil { return nil, errorx.ErrDatabaseError.WithCause(err) } + if err := s.enqueueMediaProcess(ctx, tid, asset); err != nil { + return nil, err + } return s.composeUploadResult(asset), nil } @@ -505,7 +514,7 @@ func (s *common) Upload( TenantID: tid, UserID: userID, Type: consts.MediaAssetType(typeArg), - Status: consts.MediaAssetStatusUploaded, + Status: existing.Status, Provider: existing.Provider, Bucket: existing.Bucket, ObjectKey: existing.ObjectKey, // Reuse key @@ -527,7 +536,7 @@ func (s *common) Upload( TenantID: tid, UserID: userID, Type: consts.MediaAssetType(typeArg), - Status: consts.MediaAssetStatusUploaded, + Status: s.initialMediaStatus(consts.MediaAssetType(typeArg)), Provider: s.storage.Provider(), Bucket: s.storage.Bucket(), ObjectKey: objectKey, @@ -542,6 +551,9 @@ func (s *common) Upload( if err := models.MediaAssetQuery.WithContext(ctx).Create(asset); err != nil { return nil, errorx.ErrDatabaseError.WithCause(err) } + if err := s.enqueueMediaProcess(ctx, tid, asset); err != nil { + return nil, err + } return s.composeUploadResult(asset), nil } @@ -575,6 +587,48 @@ func (s *common) GetAssetURL(objectKey string) string { return url } +func (s *common) initialMediaStatus(mediaType consts.MediaAssetType) consts.MediaAssetStatus { + if s.needsMediaProcess(mediaType) { + return consts.MediaAssetStatusUploaded + } + return consts.MediaAssetStatusReady +} + +func (s *common) needsMediaProcess(mediaType consts.MediaAssetType) bool { + return mediaType == consts.MediaAssetTypeVideo +} + +func (s *common) enqueueMediaProcess(ctx context.Context, tenantID int64, asset *models.MediaAsset) error { + if asset == nil { + return nil + } + if !s.needsMediaProcess(asset.Type) { + return nil + } + // 测试环境或本地快速验证时直接模拟处理,避免依赖任务系统。 + if os.Getenv("JOB_INLINE") == "1" { + _, err := models.MediaAssetQuery.WithContext(ctx). + Where(models.MediaAssetQuery.ID.Eq(asset.ID)). + UpdateSimple( + models.MediaAssetQuery.Status.Value(consts.MediaAssetStatusReady), + models.MediaAssetQuery.UpdatedAt.Value(time.Now()), + ) + if err != nil { + return errorx.ErrDatabaseError.WithCause(err) + } + return nil + } + + arg := args.MediaAssetProcessJob{ + TenantID: tenantID, + AssetID: asset.ID, + } + if err := s.job.Add(arg); err != nil { + return errorx.ErrInternalError.WithCause(err).WithMsg("添加媒体处理任务失败") + } + return nil +} + func retryCriticalWrite(ctx context.Context, fn func() error) error { backoffs := []time.Duration{ 50 * time.Millisecond, diff --git a/backend/app/services/provider.gen.go b/backend/app/services/provider.gen.go index f537319..5e760f9 100755 --- a/backend/app/services/provider.gen.go +++ b/backend/app/services/provider.gen.go @@ -22,9 +22,11 @@ func Provide(opts ...opt.Option) error { return err } if err := container.Container.Provide(func( + job *job.Job, storage *storage.Storage, ) (*common, error) { obj := &common{ + job: job, storage: storage, }