package jobs import ( "context" "time" jobs_args "quyun/v2/app/jobs/args" "quyun/v2/app/services" . "github.com/riverqueue/river" log "github.com/sirupsen/logrus" ) var _ Worker[jobs_args.MediaAssetProcessJob] = (*MediaAssetProcessJobWorker)(nil) // MediaAssetProcessJobWorker 负责执行媒体资源处理的异步处理(占位实现)。 // // 当前实现为 stub:不对接外部转码/截图服务,仅回写 meta 并将状态置为 ready。 // // @provider(job) type MediaAssetProcessJobWorker struct { WorkerDefaults[jobs_args.MediaAssetProcessJob] } func (w *MediaAssetProcessJobWorker) Work(ctx context.Context, job *Job[jobs_args.MediaAssetProcessJob]) error { args := job.Args attempt := 0 if job != nil && job.JobRow != nil { attempt = job.Attempt } logger := log.WithFields(log.Fields{ "job_kind": args.Kind(), "tenant_id": args.TenantID, "asset_id": args.AssetID, "attempt": attempt, }) logger.Info("jobs.media_asset_process.start") _, err := services.MediaAsset.ProcessSuccess(ctx, args.TenantID, args.AssetID, map[string]any{ "process_pipeline": "stub", "worker_attempt": attempt, "worker_ran_at": time.Now().UTC().Format(time.RFC3339Nano), }, time.Now().UTC()) if err != nil { if services.IsMediaAssetProcessJobNonRetryableError(err) { logger.WithError(err).Warn("jobs.media_asset_process.cancel") return JobCancel(err) } logger.WithError(err).Warn("jobs.media_asset_process.retry") return err } logger.Info("jobs.media_asset_process.ok") return nil }