package jobs import ( "context" "crypto/md5" "encoding/hex" "errors" "io" "os" "os/exec" "path" "path/filepath" "strings" "time" "quyun/v2/app/jobs/args" "quyun/v2/database/fields" "quyun/v2/database/models" "quyun/v2/pkg/consts" "quyun/v2/providers/storage" "github.com/riverqueue/river" log "github.com/sirupsen/logrus" "go.ipao.vip/gen/types" "gorm.io/gorm" ) // @provider(job) type MediaProcessWorker struct { river.WorkerDefaults[args.MediaAssetProcessJob] storage *storage.Storage } func (j *MediaProcessWorker) Work(ctx context.Context, job *river.Job[args.MediaAssetProcessJob]) error { arg := job.Args // 1. 获取媒体资源,保证租户隔离。 tbl, q := models.MediaAssetQuery.QueryContext(ctx) q = q.Where(tbl.ID.Eq(arg.AssetID)) if arg.TenantID > 0 { q = q.Where(tbl.TenantID.Eq(arg.TenantID)) } asset, err := q.First() if err != nil { if errors.Is(err, gorm.ErrRecordNotFound) { log.Warnf("media asset not found: %d", arg.AssetID) return river.JobCancel(err) } return err } if asset.Status == consts.MediaAssetStatusReady || asset.Status == consts.MediaAssetStatusDeleted { return nil } // 2. 更新状态为处理中,标识处理已开始。 if err := models.MediaAssetQuery.WithContext(ctx). UnderlyingDB(). Model(&models.MediaAsset{}). Where("id = ?", asset.ID). Updates(map[string]any{ "status": consts.MediaAssetStatusProcessing, "updated_at": time.Now(), }).Error; err != nil { return err } finalStatus := consts.MediaAssetStatusReady // 3. 处理视频(FFmpeg,未安装时走模拟流程)。 if asset.Type == consts.MediaAssetTypeVideo { if strings.ToLower(asset.Provider) == "local" { localPath := j.storage.Config.LocalPath if localPath == "" { localPath = "./storage" } inputFile := filepath.Join(localPath, asset.ObjectKey) if _, err := os.Stat(inputFile); err != nil { log.Errorf("media file missing: %s, err=%v", inputFile, err) finalStatus = consts.MediaAssetStatusFailed } else if _, err := exec.LookPath("ffmpeg"); err != nil { log.Warn("ffmpeg not found, skipping real transcoding") } else { outputDir := filepath.Dir(inputFile) coverTempKey := asset.ObjectKey + ".jpg" coverFile := filepath.Join(outputDir, filepath.Base(coverTempKey)) // 生成封面,作为后续管线的占位输出。 cmd := exec.CommandContext( ctx, "ffmpeg", "-y", "-i", inputFile, "-ss", "00:00:01.000", "-vframes", "1", coverFile, ) if out, err := cmd.CombinedOutput(); err != nil { log.Errorf("ffmpeg failed: %s, output: %s", err, string(out)) finalStatus = consts.MediaAssetStatusFailed } else { log.Infof("Generated cover: %s", coverFile) // 生成封面资产记录,便于后台可追踪产物。 if err := j.registerCoverAsset(ctx, asset, coverFile); err != nil { log.Errorf("register cover failed: %s", err) finalStatus = consts.MediaAssetStatusFailed } } } } else { log.Warn("non-local provider, skipping ffmpeg processing") } } // 4. 更新最终状态。 if err := models.MediaAssetQuery.WithContext(ctx). UnderlyingDB(). Model(&models.MediaAsset{}). Where("id = ?", asset.ID). Updates(map[string]any{ "status": finalStatus, "updated_at": time.Now(), }).Error; err != nil { return err } return nil } func (j *MediaProcessWorker) registerCoverAsset(ctx context.Context, asset *models.MediaAsset, coverFile string) error { if asset == nil || coverFile == "" { return nil } if _, err := os.Stat(coverFile); err != nil { return err } // 已存在封面派生资产时直接跳过。 tbl, q := models.MediaAssetQuery.QueryContext(ctx) existing, err := q.Where( tbl.SourceAssetID.Eq(asset.ID), tbl.Type.Eq(consts.MediaAssetTypeImage), ).First() if err == nil && existing != nil { return nil } if err != nil && !errors.Is(err, gorm.ErrRecordNotFound) { return err } hash, size, err := fileMD5(coverFile) if err != nil { return err } var tenant *models.Tenant if asset.TenantID > 0 { tenant, err = models.TenantQuery.WithContext(ctx). Where(models.TenantQuery.ID.Eq(asset.TenantID)). First() if err != nil && !errors.Is(err, gorm.ErrRecordNotFound) { return err } } filename := asset.Meta.Data().Filename if filename == "" { filename = filepath.Base(asset.ObjectKey) } coverName := coverFilename(filename) objectKey := buildObjectKey(tenant, hash, coverName) // 本地存储将文件移动到目标 objectKey 位置,保持路径规范。 localPath := j.storage.Config.LocalPath if localPath == "" { localPath = "./storage" } dstPath := filepath.Join(localPath, filepath.FromSlash(objectKey)) if coverFile != dstPath { if err := os.MkdirAll(filepath.Dir(dstPath), 0o755); err != nil { return err } if _, err := os.Stat(dstPath); err == nil { _ = os.Remove(coverFile) } else if err := os.Rename(coverFile, dstPath); err != nil { return err } } coverAsset := &models.MediaAsset{ TenantID: asset.TenantID, UserID: asset.UserID, Type: consts.MediaAssetTypeImage, Status: consts.MediaAssetStatusReady, Provider: asset.Provider, Bucket: asset.Bucket, ObjectKey: objectKey, Hash: hash, Variant: consts.MediaAssetVariantMain, SourceAssetID: asset.ID, Meta: types.NewJSONType(fields.MediaAssetMeta{ Filename: coverName, Size: size, }), } if err := models.MediaAssetQuery.WithContext(ctx).Create(coverAsset); err != nil { return err } return nil } func coverFilename(filename string) string { base := strings.TrimSuffix(filename, filepath.Ext(filename)) if base == "" { base = "cover" } return base + "_cover.jpg" } func buildObjectKey(tenant *models.Tenant, hash, filename string) string { // 按租户维度组织对象路径:quyun//. tenantUUID := "public" if tenant != nil && tenant.UUID.String() != "" { tenantUUID = tenant.UUID.String() } ext := strings.ToLower(filepath.Ext(filename)) return path.Join("quyun", tenantUUID, hash+ext) } func fileMD5(filename string) (string, int64, error) { f, err := os.Open(filename) if err != nil { return "", 0, err } defer f.Close() h := md5.New() size, err := io.Copy(h, f) if err != nil { return "", size, err } return hex.EncodeToString(h.Sum(nil)), size, nil }