package jobs import ( "context" "errors" "os" "os/exec" "path/filepath" "strings" "time" "quyun/v2/app/jobs/args" "quyun/v2/database/models" "quyun/v2/pkg/consts" "quyun/v2/providers/storage" "github.com/riverqueue/river" log "github.com/sirupsen/logrus" "gorm.io/gorm" ) // @provider(job) type MediaProcessWorker struct { river.WorkerDefaults[args.MediaAssetProcessJob] storage *storage.Storage } func (j *MediaProcessWorker) Work(ctx context.Context, job *river.Job[args.MediaAssetProcessJob]) error { arg := job.Args // 1. 获取媒体资源,保证租户隔离。 tbl, q := models.MediaAssetQuery.QueryContext(ctx) q = q.Where(tbl.ID.Eq(arg.AssetID)) if arg.TenantID > 0 { q = q.Where(tbl.TenantID.Eq(arg.TenantID)) } asset, err := q.First() if err != nil { if errors.Is(err, gorm.ErrRecordNotFound) { log.Warnf("media asset not found: %d", arg.AssetID) return river.JobCancel(err) } return err } if asset.Status == consts.MediaAssetStatusReady || asset.Status == consts.MediaAssetStatusDeleted { return nil } // 2. 更新状态为处理中,标识处理已开始。 _, err = models.MediaAssetQuery.WithContext(ctx). Where(models.MediaAssetQuery.ID.Eq(asset.ID)). UpdateSimple( models.MediaAssetQuery.Status.Value(consts.MediaAssetStatusProcessing), models.MediaAssetQuery.UpdatedAt.Value(time.Now()), ) if err != nil { return err } finalStatus := consts.MediaAssetStatusReady // 3. 处理视频(FFmpeg,未安装时走模拟流程)。 if asset.Type == consts.MediaAssetTypeVideo { if strings.ToLower(asset.Provider) == "local" { localPath := j.storage.Config.LocalPath if localPath == "" { localPath = "./storage" } inputFile := filepath.Join(localPath, asset.ObjectKey) if _, err := os.Stat(inputFile); err != nil { log.Errorf("media file missing: %s, err=%v", inputFile, err) finalStatus = consts.MediaAssetStatusFailed } else if _, err := exec.LookPath("ffmpeg"); err != nil { log.Warn("ffmpeg not found, skipping real transcoding") } else { outputDir := filepath.Dir(inputFile) coverKey := asset.ObjectKey + ".jpg" coverFile := filepath.Join(outputDir, filepath.Base(coverKey)) // 生成封面,作为后续管线的占位输出。 cmd := exec.CommandContext( ctx, "ffmpeg", "-y", "-i", inputFile, "-ss", "00:00:01.000", "-vframes", "1", coverFile, ) if out, err := cmd.CombinedOutput(); err != nil { log.Errorf("ffmpeg failed: %s, output: %s", err, string(out)) finalStatus = consts.MediaAssetStatusFailed } else { log.Infof("Generated cover: %s", coverFile) // TODO: 生成封面资产或写入 meta,由后续管线接管。 } } } else { log.Warn("non-local provider, skipping ffmpeg processing") } } // 4. 更新最终状态。 _, err = models.MediaAssetQuery.WithContext(ctx). Where(models.MediaAssetQuery.ID.Eq(asset.ID)). Updates(&models.MediaAsset{ Status: finalStatus, UpdatedAt: time.Now(), }) return err }