feat: enqueue media asset processing
This commit is contained in:
@@ -2,8 +2,11 @@ package jobs
|
||||
|
||||
import (
|
||||
"context"
|
||||
"errors"
|
||||
"os"
|
||||
"os/exec"
|
||||
"path/filepath"
|
||||
"strings"
|
||||
"time"
|
||||
|
||||
"quyun/v2/app/jobs/args"
|
||||
@@ -13,74 +16,98 @@ import (
|
||||
|
||||
"github.com/riverqueue/river"
|
||||
log "github.com/sirupsen/logrus"
|
||||
"gorm.io/gorm"
|
||||
)
|
||||
|
||||
// @provider(job)
|
||||
type MediaProcessWorker struct {
|
||||
river.WorkerDefaults[args.MediaProcessArgs]
|
||||
river.WorkerDefaults[args.MediaAssetProcessJob]
|
||||
storage *storage.Storage
|
||||
}
|
||||
|
||||
func (j *MediaProcessWorker) Work(ctx context.Context, job *river.Job[args.MediaProcessArgs]) error {
|
||||
func (j *MediaProcessWorker) Work(ctx context.Context, job *river.Job[args.MediaAssetProcessJob]) error {
|
||||
arg := job.Args
|
||||
// 1. Fetch Asset
|
||||
asset, err := models.MediaAssetQuery.WithContext(ctx).Where(models.MediaAssetQuery.ID.Eq(arg.AssetID)).First()
|
||||
// 1. 获取媒体资源,保证租户隔离。
|
||||
tbl, q := models.MediaAssetQuery.QueryContext(ctx)
|
||||
q = q.Where(tbl.ID.Eq(arg.AssetID))
|
||||
if arg.TenantID > 0 {
|
||||
q = q.Where(tbl.TenantID.Eq(arg.TenantID))
|
||||
}
|
||||
asset, err := q.First()
|
||||
if err != nil {
|
||||
if errors.Is(err, gorm.ErrRecordNotFound) {
|
||||
log.Warnf("media asset not found: %d", arg.AssetID)
|
||||
return river.JobCancel(err)
|
||||
}
|
||||
return err
|
||||
}
|
||||
|
||||
// 2. Update status to processing
|
||||
if asset.Status == consts.MediaAssetStatusReady || asset.Status == consts.MediaAssetStatusDeleted {
|
||||
return nil
|
||||
}
|
||||
|
||||
// 2. 更新状态为处理中,标识处理已开始。
|
||||
_, err = models.MediaAssetQuery.WithContext(ctx).
|
||||
Where(models.MediaAssetQuery.ID.Eq(asset.ID)).
|
||||
UpdateSimple(models.MediaAssetQuery.Status.Value(consts.MediaAssetStatusProcessing))
|
||||
UpdateSimple(
|
||||
models.MediaAssetQuery.Status.Value(consts.MediaAssetStatusProcessing),
|
||||
models.MediaAssetQuery.UpdatedAt.Value(time.Now()),
|
||||
)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
// 3. Process Video (FFmpeg)
|
||||
finalStatus := consts.MediaAssetStatusReady
|
||||
|
||||
// 3. 处理视频(FFmpeg,未安装时走模拟流程)。
|
||||
if asset.Type == consts.MediaAssetTypeVideo {
|
||||
if _, err := exec.LookPath("ffmpeg"); err == nil {
|
||||
if strings.ToLower(asset.Provider) == "local" {
|
||||
localPath := j.storage.Config.LocalPath
|
||||
if localPath == "" {
|
||||
localPath = "./storage"
|
||||
}
|
||||
inputFile := filepath.Join(localPath, asset.ObjectKey)
|
||||
outputDir := filepath.Dir(inputFile)
|
||||
// Simple transcoding: convert to MP4 (mocking complex HLS for simplicity)
|
||||
// Or just extract cover
|
||||
coverKey := asset.ObjectKey + ".jpg"
|
||||
coverFile := filepath.Join(outputDir, filepath.Base(coverKey))
|
||||
|
||||
// Generate Cover
|
||||
cmd := exec.CommandContext(
|
||||
ctx,
|
||||
"ffmpeg",
|
||||
"-y",
|
||||
"-i",
|
||||
inputFile,
|
||||
"-ss",
|
||||
"00:00:01.000",
|
||||
"-vframes",
|
||||
"1",
|
||||
coverFile,
|
||||
)
|
||||
if out, err := cmd.CombinedOutput(); err != nil {
|
||||
log.Errorf("ffmpeg failed: %s, output: %s", err, string(out))
|
||||
// Don't fail the job, just skip cover
|
||||
if _, err := os.Stat(inputFile); err != nil {
|
||||
log.Errorf("media file missing: %s, err=%v", inputFile, err)
|
||||
finalStatus = consts.MediaAssetStatusFailed
|
||||
} else if _, err := exec.LookPath("ffmpeg"); err != nil {
|
||||
log.Warn("ffmpeg not found, skipping real transcoding")
|
||||
} else {
|
||||
log.Infof("Generated cover: %s", coverFile)
|
||||
// TODO: Create MediaAsset for cover? Or update meta?
|
||||
outputDir := filepath.Dir(inputFile)
|
||||
coverKey := asset.ObjectKey + ".jpg"
|
||||
coverFile := filepath.Join(outputDir, filepath.Base(coverKey))
|
||||
|
||||
// 生成封面,作为后续管线的占位输出。
|
||||
cmd := exec.CommandContext(
|
||||
ctx,
|
||||
"ffmpeg",
|
||||
"-y",
|
||||
"-i",
|
||||
inputFile,
|
||||
"-ss",
|
||||
"00:00:01.000",
|
||||
"-vframes",
|
||||
"1",
|
||||
coverFile,
|
||||
)
|
||||
if out, err := cmd.CombinedOutput(); err != nil {
|
||||
log.Errorf("ffmpeg failed: %s, output: %s", err, string(out))
|
||||
finalStatus = consts.MediaAssetStatusFailed
|
||||
} else {
|
||||
log.Infof("Generated cover: %s", coverFile)
|
||||
// TODO: 生成封面资产或写入 meta,由后续管线接管。
|
||||
}
|
||||
}
|
||||
} else {
|
||||
log.Warn("ffmpeg not found, skipping real transcoding")
|
||||
log.Warn("non-local provider, skipping ffmpeg processing")
|
||||
}
|
||||
}
|
||||
|
||||
// 4. Update status to ready
|
||||
// 4. 更新最终状态。
|
||||
_, err = models.MediaAssetQuery.WithContext(ctx).
|
||||
Where(models.MediaAssetQuery.ID.Eq(asset.ID)).
|
||||
Updates(&models.MediaAsset{
|
||||
Status: consts.MediaAssetStatusReady,
|
||||
Status: finalStatus,
|
||||
UpdatedAt: time.Now(),
|
||||
})
|
||||
return err
|
||||
|
||||
@@ -18,10 +18,12 @@ import (
|
||||
|
||||
"quyun/v2/app/errorx"
|
||||
common_dto "quyun/v2/app/http/v1/dto"
|
||||
"quyun/v2/app/jobs/args"
|
||||
"quyun/v2/app/requests"
|
||||
"quyun/v2/database/fields"
|
||||
"quyun/v2/database/models"
|
||||
"quyun/v2/pkg/consts"
|
||||
"quyun/v2/providers/job"
|
||||
"quyun/v2/providers/storage"
|
||||
|
||||
"github.com/google/uuid"
|
||||
@@ -33,6 +35,7 @@ import (
|
||||
// @provider
|
||||
type common struct {
|
||||
storage *storage.Storage
|
||||
job *job.Job
|
||||
}
|
||||
|
||||
func (s *common) Options(ctx context.Context) (*common_dto.OptionsResponse, error) {
|
||||
@@ -88,7 +91,7 @@ func (s *common) CheckHash(ctx context.Context, tenantID, userID int64, hash str
|
||||
TenantID: tid,
|
||||
UserID: userID,
|
||||
Type: existing.Type,
|
||||
Status: consts.MediaAssetStatusUploaded,
|
||||
Status: existing.Status,
|
||||
Provider: existing.Provider,
|
||||
Bucket: existing.Bucket,
|
||||
ObjectKey: existing.ObjectKey,
|
||||
@@ -99,6 +102,9 @@ func (s *common) CheckHash(ctx context.Context, tenantID, userID int64, hash str
|
||||
if err := models.MediaAssetQuery.WithContext(ctx).Create(asset); err != nil {
|
||||
return nil, errorx.ErrDatabaseError.WithCause(err)
|
||||
}
|
||||
if err := s.enqueueMediaProcess(ctx, tid, asset); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
return s.composeUploadResult(asset), nil
|
||||
}
|
||||
@@ -341,7 +347,7 @@ func (s *common) CompleteUpload(ctx context.Context, tenantID, userID int64, for
|
||||
TenantID: tid,
|
||||
UserID: userID,
|
||||
Type: consts.MediaAssetType(meta.Type),
|
||||
Status: consts.MediaAssetStatusUploaded,
|
||||
Status: existing.Status,
|
||||
Provider: existing.Provider,
|
||||
Bucket: existing.Bucket,
|
||||
ObjectKey: existing.ObjectKey,
|
||||
@@ -360,7 +366,7 @@ func (s *common) CompleteUpload(ctx context.Context, tenantID, userID int64, for
|
||||
TenantID: tid,
|
||||
UserID: userID,
|
||||
Type: consts.MediaAssetType(meta.Type),
|
||||
Status: consts.MediaAssetStatusUploaded,
|
||||
Status: s.initialMediaStatus(consts.MediaAssetType(meta.Type)),
|
||||
Provider: s.storage.Provider(),
|
||||
Bucket: s.storage.Bucket(),
|
||||
ObjectKey: objectKey,
|
||||
@@ -376,6 +382,9 @@ func (s *common) CompleteUpload(ctx context.Context, tenantID, userID int64, for
|
||||
if err := models.MediaAssetQuery.WithContext(ctx).Create(asset); err != nil {
|
||||
return nil, errorx.ErrDatabaseError.WithCause(err)
|
||||
}
|
||||
if err := s.enqueueMediaProcess(ctx, tid, asset); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
return s.composeUploadResult(asset), nil
|
||||
}
|
||||
@@ -505,7 +514,7 @@ func (s *common) Upload(
|
||||
TenantID: tid,
|
||||
UserID: userID,
|
||||
Type: consts.MediaAssetType(typeArg),
|
||||
Status: consts.MediaAssetStatusUploaded,
|
||||
Status: existing.Status,
|
||||
Provider: existing.Provider,
|
||||
Bucket: existing.Bucket,
|
||||
ObjectKey: existing.ObjectKey, // Reuse key
|
||||
@@ -527,7 +536,7 @@ func (s *common) Upload(
|
||||
TenantID: tid,
|
||||
UserID: userID,
|
||||
Type: consts.MediaAssetType(typeArg),
|
||||
Status: consts.MediaAssetStatusUploaded,
|
||||
Status: s.initialMediaStatus(consts.MediaAssetType(typeArg)),
|
||||
Provider: s.storage.Provider(),
|
||||
Bucket: s.storage.Bucket(),
|
||||
ObjectKey: objectKey,
|
||||
@@ -542,6 +551,9 @@ func (s *common) Upload(
|
||||
if err := models.MediaAssetQuery.WithContext(ctx).Create(asset); err != nil {
|
||||
return nil, errorx.ErrDatabaseError.WithCause(err)
|
||||
}
|
||||
if err := s.enqueueMediaProcess(ctx, tid, asset); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
return s.composeUploadResult(asset), nil
|
||||
}
|
||||
@@ -575,6 +587,48 @@ func (s *common) GetAssetURL(objectKey string) string {
|
||||
return url
|
||||
}
|
||||
|
||||
func (s *common) initialMediaStatus(mediaType consts.MediaAssetType) consts.MediaAssetStatus {
|
||||
if s.needsMediaProcess(mediaType) {
|
||||
return consts.MediaAssetStatusUploaded
|
||||
}
|
||||
return consts.MediaAssetStatusReady
|
||||
}
|
||||
|
||||
func (s *common) needsMediaProcess(mediaType consts.MediaAssetType) bool {
|
||||
return mediaType == consts.MediaAssetTypeVideo
|
||||
}
|
||||
|
||||
func (s *common) enqueueMediaProcess(ctx context.Context, tenantID int64, asset *models.MediaAsset) error {
|
||||
if asset == nil {
|
||||
return nil
|
||||
}
|
||||
if !s.needsMediaProcess(asset.Type) {
|
||||
return nil
|
||||
}
|
||||
// 测试环境或本地快速验证时直接模拟处理,避免依赖任务系统。
|
||||
if os.Getenv("JOB_INLINE") == "1" {
|
||||
_, err := models.MediaAssetQuery.WithContext(ctx).
|
||||
Where(models.MediaAssetQuery.ID.Eq(asset.ID)).
|
||||
UpdateSimple(
|
||||
models.MediaAssetQuery.Status.Value(consts.MediaAssetStatusReady),
|
||||
models.MediaAssetQuery.UpdatedAt.Value(time.Now()),
|
||||
)
|
||||
if err != nil {
|
||||
return errorx.ErrDatabaseError.WithCause(err)
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
arg := args.MediaAssetProcessJob{
|
||||
TenantID: tenantID,
|
||||
AssetID: asset.ID,
|
||||
}
|
||||
if err := s.job.Add(arg); err != nil {
|
||||
return errorx.ErrInternalError.WithCause(err).WithMsg("添加媒体处理任务失败")
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
func retryCriticalWrite(ctx context.Context, fn func() error) error {
|
||||
backoffs := []time.Duration{
|
||||
50 * time.Millisecond,
|
||||
|
||||
@@ -22,9 +22,11 @@ func Provide(opts ...opt.Option) error {
|
||||
return err
|
||||
}
|
||||
if err := container.Container.Provide(func(
|
||||
job *job.Job,
|
||||
storage *storage.Storage,
|
||||
) (*common, error) {
|
||||
obj := &common{
|
||||
job: job,
|
||||
storage: storage,
|
||||
}
|
||||
|
||||
|
||||
Reference in New Issue
Block a user