@@ -2,7 +2,7 @@ package jobs
import (
"context"
"crypto/md5 "
"crypto/sha256 "
"encoding/hex"
"errors"
"io"
@@ -20,7 +20,7 @@ import (
"quyun/v2/providers/storage"
"github.com/riverqueue/river"
log "github.com/sirupsen/logrus"
logrus "github.com/sirupsen/logrus"
"go.ipao.vip/gen/types"
"gorm.io/gorm"
)
@@ -31,7 +31,7 @@ type MediaProcessWorker struct {
storage * storage . Storage
}
func ( j * MediaProcessWorker ) Work ( ctx context . Context , job * river . Job [ args . MediaAssetProcessJob ] ) error {
func ( worker * MediaProcessWorker ) Work ( ctx context . Context , job * river . Job [ args . MediaAssetProcessJob ] ) error {
arg := job . Args
// 1. 获取媒体资源,保证租户隔离。
tbl , q := models . MediaAssetQuery . QueryContext ( ctx )
@@ -42,9 +42,11 @@ func (j *MediaProcessWorker) Work(ctx context.Context, job *river.Job[args.Media
asset , err := q . First ( )
if err != nil {
if errors . Is ( err , gorm . ErrRecordNotFound ) {
log . Warnf ( "media asset not found: %d" , arg . AssetID )
logrus . Warnf ( "media asset not found: %d" , arg . AssetID )
return river . JobCancel ( err )
}
return err
}
@@ -69,16 +71,16 @@ func (j *MediaProcessWorker) Work(ctx context.Context, job *river.Job[args.Media
// 3. 处理视频( FFmpeg, 未安装时走模拟流程) 。
if asset . Type == consts . MediaAssetTypeVideo {
if strings . ToLower ( asset . Provider ) == "local" {
localPath := j . storage . Config . LocalPath
localPath := worker . storage . Config . LocalPath
if localPath == "" {
localPath = "./storage"
}
inputFile := filepath . Join ( localPath , asset . ObjectKey )
if _ , err := os . Stat ( inputFile ) ; err != nil {
log . Errorf ( "media file missing: %s, err=%v" , inputFile , err )
logrus . Errorf ( "media file missing: %s, err=%v" , inputFile , err )
finalStatus = consts . MediaAssetStatusFailed
} else if _ , err := exec . LookPath ( "ffmpeg" ) ; err != nil {
log . Warn ( "ffmpeg not found, skipping real transcoding" )
logrus . Warn ( "ffmpeg not found, skipping real transcoding" )
} else {
outputDir := filepath . Dir ( inputFile )
coverTempKey := asset . ObjectKey + ".jpg"
@@ -102,13 +104,12 @@ func (j *MediaProcessWorker) Work(ctx context.Context, job *river.Job[args.Media
coverFile ,
)
if out , err := cmd . CombinedOutput ( ) ; err != nil {
log . Errorf ( "ffmpeg failed: %s, output: %s" , err , string ( out ) )
logrus . Errorf ( "ffmpeg failed: %s, output: %s" , err , string ( out ) )
finalStatus = consts . MediaAssetStatusFailed
} else {
log . Infof ( "Generated cover: %s" , coverFile )
// 生成封面资产记录,便于后台可追踪产物。
if err := j . registerCoverAsset ( ctx , asset , coverFile ) ; err != nil {
log . Errorf ( "register cover failed: %s" , err )
logrus . Infof ( "Generated cover: %s" , coverFile )
if err := worker . registerCoverAsset ( ctx , asset , coverFile ) ; err != nil {
logrus . Errorf ( "register cover failed: %s" , err )
finalStatus = consts . MediaAssetStatusFailed
}
}
@@ -116,17 +117,17 @@ func (j *MediaProcessWorker) Work(ctx context.Context, job *river.Job[args.Media
} else {
tempDir , err := os . MkdirTemp ( "" , "media-process-" )
if err != nil {
log . Errorf ( "create temp dir failed: %v" , err )
logrus . Errorf ( "create temp dir failed: %v" , err )
finalStatus = consts . MediaAssetStatusFailed
} else {
defer os . RemoveAll ( tempDir )
ext := path . Ext ( asset . ObjectKey )
inputFile := filepath . Join ( tempDir , "source" + ext )
if err := j . storage . Download ( ctx , asset . ObjectKey , inputFile ) ; err != nil {
log . Errorf ( "download media file failed: %s, err=%v" , asset . ObjectKey , err )
if err := worker . storage . Download ( ctx , asset . ObjectKey , inputFile ) ; err != nil {
logrus . Errorf ( "download media file failed: %s, err=%v" , asset . ObjectKey , err )
finalStatus = consts . MediaAssetStatusFailed
} else if _ , err := exec . LookPath ( "ffmpeg" ) ; err != nil {
log . Warn ( "ffmpeg not found, skipping real transcoding" )
logrus . Warn ( "ffmpeg not found, skipping real transcoding" )
} else {
coverFile := filepath . Join ( tempDir , "cover.jpg" )
cmd := exec . CommandContext (
@@ -146,12 +147,12 @@ func (j *MediaProcessWorker) Work(ctx context.Context, job *river.Job[args.Media
coverFile ,
)
if out , err := cmd . CombinedOutput ( ) ; err != nil {
log . Errorf ( "ffmpeg failed: %s, output: %s" , err , string ( out ) )
logrus . Errorf ( "ffmpeg failed: %s, output: %s" , err , string ( out ) )
finalStatus = consts . MediaAssetStatusFailed
} else {
log . Infof ( "Generated cover: %s" , coverFile )
if err := j . registerCoverAsset ( ctx , asset , coverFile ) ; err != nil {
log . Errorf ( "register cover failed: %s" , err )
logrus . Infof ( "Generated cover: %s" , coverFile )
if err := worker . registerCoverAsset ( ctx , asset , coverFile ) ; err != nil {
logrus . Errorf ( "register cover failed: %s" , err )
finalStatus = consts . MediaAssetStatusFailed
}
}
@@ -171,10 +172,11 @@ func (j *MediaProcessWorker) Work(ctx context.Context, job *river.Job[args.Media
} ) . Error ; err != nil {
return err
}
return nil
}
func ( j * MediaProcessWorker ) registerCoverAsset ( ctx context . Context , asset * models . MediaAsset , coverFile string ) error {
func ( worker * MediaProcessWorker ) registerCoverAsset ( ctx context . Context , asset * models . MediaAsset , coverFile string ) error {
if asset == nil || coverFile == "" {
return nil
}
@@ -182,7 +184,6 @@ func (j *MediaProcessWorker) registerCoverAsset(ctx context.Context, asset *mode
return err
}
// 已存在封面派生资产时直接跳过。
tbl , q := models . MediaAssetQuery . QueryContext ( ctx )
existing , err := q . Where (
tbl . SourceAssetID . Eq ( asset . ID ) ,
@@ -195,7 +196,7 @@ func (j *MediaProcessWorker) registerCoverAsset(ctx context.Context, asset *mode
return err
}
hash , size , err := fileMD5 ( coverFile )
hash , size , err := fileSHA256 ( coverFile )
if err != nil {
return err
}
@@ -218,7 +219,7 @@ func (j *MediaProcessWorker) registerCoverAsset(ctx context.Context, asset *mode
objectKey := buildObjectKey ( tenant , hash , coverName )
if strings . ToLower ( asset . Provider ) == "local" {
localPath := j . storage . Config . LocalPath
localPath := worker . storage . Config . LocalPath
if localPath == "" {
localPath = "./storage"
}
@@ -234,7 +235,7 @@ func (j *MediaProcessWorker) registerCoverAsset(ctx context.Context, asset *mode
}
}
} else {
if err := j . storage . PutObject ( ctx , objectKey , coverFile , "image/jpeg" ) ; err != nil {
if err := worker . storage . PutObject ( ctx , objectKey , coverFile , "image/jpeg" ) ; err != nil {
return err
}
_ = os . Remove ( coverFile )
@@ -259,6 +260,7 @@ func (j *MediaProcessWorker) registerCoverAsset(ctx context.Context, asset *mode
if err := models . MediaAssetQuery . WithContext ( ctx ) . Create ( coverAsset ) ; err != nil {
return err
}
return nil
}
@@ -267,30 +269,32 @@ func coverFilename(filename string) string {
if base == "" {
base = "cover"
}
return base + "_cover.jpg"
}
func buildObjectKey ( tenant * models . Tenant , hash , filename string ) string {
// 按租户维度组织对象路径: quyun/<tenant_uuid>/<hash>.<ext>
tenantUUID := "public"
if tenant != nil && tenant . UUID . String ( ) != "" {
tenantUUID = tenant . UUID . String ( )
}
ext := strings . ToLower ( filepath . Ext ( filename ) )
return path . Join ( "quyun" , tenantUUID , hash + ext )
}
func fileMD5 ( filename string ) ( string , int64 , error ) {
func fileSHA256 ( filename string ) ( string , int64 , error ) {
f , err := os . Open ( filename )
if err != nil {
return "" , 0 , err
}
defer f . Close ( )
h := md5 . New ( )
h := sha256 . New ( )
size , err := io . Copy ( h , f )
if err != nil {
return "" , size , err
}
return hex . EncodeToString ( h . Sum ( nil ) ) , size , nil
}