Files
quyun-v2/backend/app/jobs/media_process_job.go

115 lines
3.0 KiB
Go
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
package jobs
import (
"context"
"errors"
"os"
"os/exec"
"path/filepath"
"strings"
"time"
"quyun/v2/app/jobs/args"
"quyun/v2/database/models"
"quyun/v2/pkg/consts"
"quyun/v2/providers/storage"
"github.com/riverqueue/river"
log "github.com/sirupsen/logrus"
"gorm.io/gorm"
)
// @provider(job)
type MediaProcessWorker struct {
river.WorkerDefaults[args.MediaAssetProcessJob]
storage *storage.Storage
}
func (j *MediaProcessWorker) Work(ctx context.Context, job *river.Job[args.MediaAssetProcessJob]) error {
arg := job.Args
// 1. 获取媒体资源,保证租户隔离。
tbl, q := models.MediaAssetQuery.QueryContext(ctx)
q = q.Where(tbl.ID.Eq(arg.AssetID))
if arg.TenantID > 0 {
q = q.Where(tbl.TenantID.Eq(arg.TenantID))
}
asset, err := q.First()
if err != nil {
if errors.Is(err, gorm.ErrRecordNotFound) {
log.Warnf("media asset not found: %d", arg.AssetID)
return river.JobCancel(err)
}
return err
}
if asset.Status == consts.MediaAssetStatusReady || asset.Status == consts.MediaAssetStatusDeleted {
return nil
}
// 2. 更新状态为处理中,标识处理已开始。
_, err = models.MediaAssetQuery.WithContext(ctx).
Where(models.MediaAssetQuery.ID.Eq(asset.ID)).
UpdateSimple(
models.MediaAssetQuery.Status.Value(consts.MediaAssetStatusProcessing),
models.MediaAssetQuery.UpdatedAt.Value(time.Now()),
)
if err != nil {
return err
}
finalStatus := consts.MediaAssetStatusReady
// 3. 处理视频FFmpeg未安装时走模拟流程
if asset.Type == consts.MediaAssetTypeVideo {
if strings.ToLower(asset.Provider) == "local" {
localPath := j.storage.Config.LocalPath
if localPath == "" {
localPath = "./storage"
}
inputFile := filepath.Join(localPath, asset.ObjectKey)
if _, err := os.Stat(inputFile); err != nil {
log.Errorf("media file missing: %s, err=%v", inputFile, err)
finalStatus = consts.MediaAssetStatusFailed
} else if _, err := exec.LookPath("ffmpeg"); err != nil {
log.Warn("ffmpeg not found, skipping real transcoding")
} else {
outputDir := filepath.Dir(inputFile)
coverKey := asset.ObjectKey + ".jpg"
coverFile := filepath.Join(outputDir, filepath.Base(coverKey))
// 生成封面,作为后续管线的占位输出。
cmd := exec.CommandContext(
ctx,
"ffmpeg",
"-y",
"-i",
inputFile,
"-ss",
"00:00:01.000",
"-vframes",
"1",
coverFile,
)
if out, err := cmd.CombinedOutput(); err != nil {
log.Errorf("ffmpeg failed: %s, output: %s", err, string(out))
finalStatus = consts.MediaAssetStatusFailed
} else {
log.Infof("Generated cover: %s", coverFile)
// TODO: 生成封面资产或写入 meta由后续管线接管。
}
}
} else {
log.Warn("non-local provider, skipping ffmpeg processing")
}
}
// 4. 更新最终状态。
_, err = models.MediaAssetQuery.WithContext(ctx).
Where(models.MediaAssetQuery.ID.Eq(asset.ID)).
Updates(&models.MediaAsset{
Status: finalStatus,
UpdatedAt: time.Now(),
})
return err
}