Files
quyun-v2/backend/app/jobs/media_process_job.go

301 lines
7.7 KiB
Go
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
package jobs
import (
"context"
"crypto/sha256"
"encoding/hex"
"errors"
"io"
"os"
"os/exec"
"path"
"path/filepath"
"strings"
"time"
"quyun/v2/app/jobs/args"
"quyun/v2/database/fields"
"quyun/v2/database/models"
"quyun/v2/pkg/consts"
"quyun/v2/providers/storage"
"github.com/riverqueue/river"
logrus "github.com/sirupsen/logrus"
"go.ipao.vip/gen/types"
"gorm.io/gorm"
)
// @provider(job)
type MediaProcessWorker struct {
river.WorkerDefaults[args.MediaAssetProcessJob]
storage *storage.Storage
}
func (worker *MediaProcessWorker) Work(ctx context.Context, job *river.Job[args.MediaAssetProcessJob]) error {
arg := job.Args
// 1. 获取媒体资源,保证租户隔离。
tbl, q := models.MediaAssetQuery.QueryContext(ctx)
q = q.Where(tbl.ID.Eq(arg.AssetID))
if arg.TenantID > 0 {
q = q.Where(tbl.TenantID.Eq(arg.TenantID))
}
asset, err := q.First()
if err != nil {
if errors.Is(err, gorm.ErrRecordNotFound) {
logrus.Warnf("media asset not found: %d", arg.AssetID)
return river.JobCancel(err)
}
return err
}
if asset.Status == consts.MediaAssetStatusReady || asset.Status == consts.MediaAssetStatusDeleted {
return nil
}
// 2. 更新状态为处理中,标识处理已开始。
if err := models.MediaAssetQuery.WithContext(ctx).
UnderlyingDB().
Model(&models.MediaAsset{}).
Where("id = ?", asset.ID).
Updates(map[string]any{
"status": consts.MediaAssetStatusProcessing,
"updated_at": time.Now(),
}).Error; err != nil {
return err
}
finalStatus := consts.MediaAssetStatusReady
// 3. 处理视频FFmpeg未安装时走模拟流程
if asset.Type == consts.MediaAssetTypeVideo {
if strings.ToLower(asset.Provider) == "local" {
localPath := worker.storage.Config.LocalPath
if localPath == "" {
localPath = "./storage"
}
inputFile := filepath.Join(localPath, asset.ObjectKey)
if _, err := os.Stat(inputFile); err != nil {
logrus.Errorf("media file missing: %s, err=%v", inputFile, err)
finalStatus = consts.MediaAssetStatusFailed
} else if _, err := exec.LookPath("ffmpeg"); err != nil {
logrus.Warn("ffmpeg not found, skipping real transcoding")
} else {
outputDir := filepath.Dir(inputFile)
coverTempKey := asset.ObjectKey + ".jpg"
coverFile := filepath.Join(outputDir, filepath.Base(coverTempKey))
// 生成封面,作为后续管线的占位输出。
cmd := exec.CommandContext(
ctx,
"ffmpeg",
"-y",
"-i",
inputFile,
"-ss",
"00:00:00.000",
"-vframes",
"1",
"-vf",
"format=yuv420p",
"-update",
"1",
coverFile,
)
if out, err := cmd.CombinedOutput(); err != nil {
logrus.Errorf("ffmpeg failed: %s, output: %s", err, string(out))
finalStatus = consts.MediaAssetStatusFailed
} else {
logrus.Infof("Generated cover: %s", coverFile)
if err := worker.registerCoverAsset(ctx, asset, coverFile); err != nil {
logrus.Errorf("register cover failed: %s", err)
finalStatus = consts.MediaAssetStatusFailed
}
}
}
} else {
tempDir, err := os.MkdirTemp("", "media-process-")
if err != nil {
logrus.Errorf("create temp dir failed: %v", err)
finalStatus = consts.MediaAssetStatusFailed
} else {
defer os.RemoveAll(tempDir)
ext := path.Ext(asset.ObjectKey)
inputFile := filepath.Join(tempDir, "source"+ext)
if err := worker.storage.Download(ctx, asset.ObjectKey, inputFile); err != nil {
logrus.Errorf("download media file failed: %s, err=%v", asset.ObjectKey, err)
finalStatus = consts.MediaAssetStatusFailed
} else if _, err := exec.LookPath("ffmpeg"); err != nil {
logrus.Warn("ffmpeg not found, skipping real transcoding")
} else {
coverFile := filepath.Join(tempDir, "cover.jpg")
cmd := exec.CommandContext(
ctx,
"ffmpeg",
"-y",
"-i",
inputFile,
"-ss",
"00:00:00.000",
"-vframes",
"1",
"-vf",
"format=yuv420p",
"-update",
"1",
coverFile,
)
if out, err := cmd.CombinedOutput(); err != nil {
logrus.Errorf("ffmpeg failed: %s, output: %s", err, string(out))
finalStatus = consts.MediaAssetStatusFailed
} else {
logrus.Infof("Generated cover: %s", coverFile)
if err := worker.registerCoverAsset(ctx, asset, coverFile); err != nil {
logrus.Errorf("register cover failed: %s", err)
finalStatus = consts.MediaAssetStatusFailed
}
}
}
}
}
}
// 4. 更新最终状态。
if err := models.MediaAssetQuery.WithContext(ctx).
UnderlyingDB().
Model(&models.MediaAsset{}).
Where("id = ?", asset.ID).
Updates(map[string]any{
"status": finalStatus,
"updated_at": time.Now(),
}).Error; err != nil {
return err
}
return nil
}
func (worker *MediaProcessWorker) registerCoverAsset(ctx context.Context, asset *models.MediaAsset, coverFile string) error {
if asset == nil || coverFile == "" {
return nil
}
if _, err := os.Stat(coverFile); err != nil {
return err
}
tbl, q := models.MediaAssetQuery.QueryContext(ctx)
existing, err := q.Where(
tbl.SourceAssetID.Eq(asset.ID),
tbl.Type.Eq(consts.MediaAssetTypeImage),
).First()
if err == nil && existing != nil {
return nil
}
if err != nil && !errors.Is(err, gorm.ErrRecordNotFound) {
return err
}
hash, size, err := fileSHA256(coverFile)
if err != nil {
return err
}
var tenant *models.Tenant
if asset.TenantID > 0 {
tenant, err = models.TenantQuery.WithContext(ctx).
Where(models.TenantQuery.ID.Eq(asset.TenantID)).
First()
if err != nil && !errors.Is(err, gorm.ErrRecordNotFound) {
return err
}
}
filename := asset.Meta.Data().Filename
if filename == "" {
filename = filepath.Base(asset.ObjectKey)
}
coverName := coverFilename(filename)
objectKey := buildObjectKey(tenant, hash, coverName)
if strings.ToLower(asset.Provider) == "local" {
localPath := worker.storage.Config.LocalPath
if localPath == "" {
localPath = "./storage"
}
dstPath := filepath.Join(localPath, filepath.FromSlash(objectKey))
if coverFile != dstPath {
if err := os.MkdirAll(filepath.Dir(dstPath), 0o755); err != nil {
return err
}
if _, err := os.Stat(dstPath); err == nil {
_ = os.Remove(coverFile)
} else if err := os.Rename(coverFile, dstPath); err != nil {
return err
}
}
} else {
if err := worker.storage.PutObject(ctx, objectKey, coverFile, "image/jpeg"); err != nil {
return err
}
_ = os.Remove(coverFile)
}
coverAsset := &models.MediaAsset{
TenantID: asset.TenantID,
UserID: asset.UserID,
Type: consts.MediaAssetTypeImage,
Status: consts.MediaAssetStatusReady,
Provider: asset.Provider,
Bucket: asset.Bucket,
ObjectKey: objectKey,
Hash: hash,
Variant: consts.MediaAssetVariantMain,
SourceAssetID: asset.ID,
Meta: types.NewJSONType(fields.MediaAssetMeta{
Filename: coverName,
Size: size,
}),
}
if err := models.MediaAssetQuery.WithContext(ctx).Create(coverAsset); err != nil {
return err
}
return nil
}
func coverFilename(filename string) string {
base := strings.TrimSuffix(filename, filepath.Ext(filename))
if base == "" {
base = "cover"
}
return base + "_cover.jpg"
}
func buildObjectKey(tenant *models.Tenant, hash, filename string) string {
tenantUUID := "public"
if tenant != nil && tenant.UUID.String() != "" {
tenantUUID = tenant.UUID.String()
}
ext := strings.ToLower(filepath.Ext(filename))
return path.Join("quyun", tenantUUID, hash+ext)
}
func fileSHA256(filename string) (string, int64, error) {
f, err := os.Open(filename)
if err != nil {
return "", 0, err
}
defer f.Close()
h := sha256.New()
size, err := io.Copy(h, f)
if err != nil {
return "", size, err
}
return hex.EncodeToString(h.Sum(nil)), size, nil
}