249 lines
6.4 KiB
Go
249 lines
6.4 KiB
Go
package jobs
|
||
|
||
import (
|
||
"context"
|
||
"crypto/md5"
|
||
"encoding/hex"
|
||
"errors"
|
||
"io"
|
||
"os"
|
||
"os/exec"
|
||
"path"
|
||
"path/filepath"
|
||
"strings"
|
||
"time"
|
||
|
||
"quyun/v2/app/jobs/args"
|
||
"quyun/v2/database/fields"
|
||
"quyun/v2/database/models"
|
||
"quyun/v2/pkg/consts"
|
||
"quyun/v2/providers/storage"
|
||
|
||
"github.com/riverqueue/river"
|
||
log "github.com/sirupsen/logrus"
|
||
"go.ipao.vip/gen/types"
|
||
"gorm.io/gorm"
|
||
)
|
||
|
||
// @provider(job)
|
||
type MediaProcessWorker struct {
|
||
river.WorkerDefaults[args.MediaAssetProcessJob]
|
||
storage *storage.Storage
|
||
}
|
||
|
||
func (j *MediaProcessWorker) Work(ctx context.Context, job *river.Job[args.MediaAssetProcessJob]) error {
|
||
arg := job.Args
|
||
// 1. 获取媒体资源,保证租户隔离。
|
||
tbl, q := models.MediaAssetQuery.QueryContext(ctx)
|
||
q = q.Where(tbl.ID.Eq(arg.AssetID))
|
||
if arg.TenantID > 0 {
|
||
q = q.Where(tbl.TenantID.Eq(arg.TenantID))
|
||
}
|
||
asset, err := q.First()
|
||
if err != nil {
|
||
if errors.Is(err, gorm.ErrRecordNotFound) {
|
||
log.Warnf("media asset not found: %d", arg.AssetID)
|
||
return river.JobCancel(err)
|
||
}
|
||
return err
|
||
}
|
||
|
||
if asset.Status == consts.MediaAssetStatusReady || asset.Status == consts.MediaAssetStatusDeleted {
|
||
return nil
|
||
}
|
||
|
||
// 2. 更新状态为处理中,标识处理已开始。
|
||
if err := models.MediaAssetQuery.WithContext(ctx).
|
||
UnderlyingDB().
|
||
Model(&models.MediaAsset{}).
|
||
Where("id = ?", asset.ID).
|
||
Updates(map[string]any{
|
||
"status": consts.MediaAssetStatusProcessing,
|
||
"updated_at": time.Now(),
|
||
}).Error; err != nil {
|
||
return err
|
||
}
|
||
|
||
finalStatus := consts.MediaAssetStatusReady
|
||
|
||
// 3. 处理视频(FFmpeg,未安装时走模拟流程)。
|
||
if asset.Type == consts.MediaAssetTypeVideo {
|
||
if strings.ToLower(asset.Provider) == "local" {
|
||
localPath := j.storage.Config.LocalPath
|
||
if localPath == "" {
|
||
localPath = "./storage"
|
||
}
|
||
inputFile := filepath.Join(localPath, asset.ObjectKey)
|
||
if _, err := os.Stat(inputFile); err != nil {
|
||
log.Errorf("media file missing: %s, err=%v", inputFile, err)
|
||
finalStatus = consts.MediaAssetStatusFailed
|
||
} else if _, err := exec.LookPath("ffmpeg"); err != nil {
|
||
log.Warn("ffmpeg not found, skipping real transcoding")
|
||
} else {
|
||
outputDir := filepath.Dir(inputFile)
|
||
coverTempKey := asset.ObjectKey + ".jpg"
|
||
coverFile := filepath.Join(outputDir, filepath.Base(coverTempKey))
|
||
|
||
// 生成封面,作为后续管线的占位输出。
|
||
cmd := exec.CommandContext(
|
||
ctx,
|
||
"ffmpeg",
|
||
"-y",
|
||
"-i",
|
||
inputFile,
|
||
"-ss",
|
||
"00:00:00.000",
|
||
"-vframes",
|
||
"1",
|
||
"-vf",
|
||
"format=yuv420p",
|
||
"-update",
|
||
"1",
|
||
coverFile,
|
||
)
|
||
if out, err := cmd.CombinedOutput(); err != nil {
|
||
log.Errorf("ffmpeg failed: %s, output: %s", err, string(out))
|
||
finalStatus = consts.MediaAssetStatusFailed
|
||
} else {
|
||
log.Infof("Generated cover: %s", coverFile)
|
||
// 生成封面资产记录,便于后台可追踪产物。
|
||
if err := j.registerCoverAsset(ctx, asset, coverFile); err != nil {
|
||
log.Errorf("register cover failed: %s", err)
|
||
finalStatus = consts.MediaAssetStatusFailed
|
||
}
|
||
}
|
||
}
|
||
} else {
|
||
log.Warn("non-local provider, skipping ffmpeg processing")
|
||
}
|
||
}
|
||
|
||
// 4. 更新最终状态。
|
||
if err := models.MediaAssetQuery.WithContext(ctx).
|
||
UnderlyingDB().
|
||
Model(&models.MediaAsset{}).
|
||
Where("id = ?", asset.ID).
|
||
Updates(map[string]any{
|
||
"status": finalStatus,
|
||
"updated_at": time.Now(),
|
||
}).Error; err != nil {
|
||
return err
|
||
}
|
||
return nil
|
||
}
|
||
|
||
func (j *MediaProcessWorker) registerCoverAsset(ctx context.Context, asset *models.MediaAsset, coverFile string) error {
|
||
if asset == nil || coverFile == "" {
|
||
return nil
|
||
}
|
||
if _, err := os.Stat(coverFile); err != nil {
|
||
return err
|
||
}
|
||
|
||
// 已存在封面派生资产时直接跳过。
|
||
tbl, q := models.MediaAssetQuery.QueryContext(ctx)
|
||
existing, err := q.Where(
|
||
tbl.SourceAssetID.Eq(asset.ID),
|
||
tbl.Type.Eq(consts.MediaAssetTypeImage),
|
||
).First()
|
||
if err == nil && existing != nil {
|
||
return nil
|
||
}
|
||
if err != nil && !errors.Is(err, gorm.ErrRecordNotFound) {
|
||
return err
|
||
}
|
||
|
||
hash, size, err := fileMD5(coverFile)
|
||
if err != nil {
|
||
return err
|
||
}
|
||
|
||
var tenant *models.Tenant
|
||
if asset.TenantID > 0 {
|
||
tenant, err = models.TenantQuery.WithContext(ctx).
|
||
Where(models.TenantQuery.ID.Eq(asset.TenantID)).
|
||
First()
|
||
if err != nil && !errors.Is(err, gorm.ErrRecordNotFound) {
|
||
return err
|
||
}
|
||
}
|
||
|
||
filename := asset.Meta.Data().Filename
|
||
if filename == "" {
|
||
filename = filepath.Base(asset.ObjectKey)
|
||
}
|
||
coverName := coverFilename(filename)
|
||
objectKey := buildObjectKey(tenant, hash, coverName)
|
||
|
||
// 本地存储将文件移动到目标 objectKey 位置,保持路径规范。
|
||
localPath := j.storage.Config.LocalPath
|
||
if localPath == "" {
|
||
localPath = "./storage"
|
||
}
|
||
dstPath := filepath.Join(localPath, filepath.FromSlash(objectKey))
|
||
if coverFile != dstPath {
|
||
if err := os.MkdirAll(filepath.Dir(dstPath), 0o755); err != nil {
|
||
return err
|
||
}
|
||
if _, err := os.Stat(dstPath); err == nil {
|
||
_ = os.Remove(coverFile)
|
||
} else if err := os.Rename(coverFile, dstPath); err != nil {
|
||
return err
|
||
}
|
||
}
|
||
|
||
coverAsset := &models.MediaAsset{
|
||
TenantID: asset.TenantID,
|
||
UserID: asset.UserID,
|
||
Type: consts.MediaAssetTypeImage,
|
||
Status: consts.MediaAssetStatusReady,
|
||
Provider: asset.Provider,
|
||
Bucket: asset.Bucket,
|
||
ObjectKey: objectKey,
|
||
Hash: hash,
|
||
Variant: consts.MediaAssetVariantMain,
|
||
SourceAssetID: asset.ID,
|
||
Meta: types.NewJSONType(fields.MediaAssetMeta{
|
||
Filename: coverName,
|
||
Size: size,
|
||
}),
|
||
}
|
||
if err := models.MediaAssetQuery.WithContext(ctx).Create(coverAsset); err != nil {
|
||
return err
|
||
}
|
||
return nil
|
||
}
|
||
|
||
func coverFilename(filename string) string {
|
||
base := strings.TrimSuffix(filename, filepath.Ext(filename))
|
||
if base == "" {
|
||
base = "cover"
|
||
}
|
||
return base + "_cover.jpg"
|
||
}
|
||
|
||
func buildObjectKey(tenant *models.Tenant, hash, filename string) string {
|
||
// 按租户维度组织对象路径:quyun/<tenant_uuid>/<hash>.<ext>
|
||
tenantUUID := "public"
|
||
if tenant != nil && tenant.UUID.String() != "" {
|
||
tenantUUID = tenant.UUID.String()
|
||
}
|
||
ext := strings.ToLower(filepath.Ext(filename))
|
||
return path.Join("quyun", tenantUUID, hash+ext)
|
||
}
|
||
|
||
func fileMD5(filename string) (string, int64, error) {
|
||
f, err := os.Open(filename)
|
||
if err != nil {
|
||
return "", 0, err
|
||
}
|
||
defer f.Close()
|
||
|
||
h := md5.New()
|
||
size, err := io.Copy(h, f)
|
||
if err != nil {
|
||
return "", size, err
|
||
}
|
||
return hex.EncodeToString(h.Sum(nil)), size, nil
|
||
}
|