package jobs import ( "context" "time" "github.com/riverqueue/river" "quyun/v2/app/jobs/args" "quyun/v2/database/models" "quyun/v2/pkg/consts" ) // @provider(job) type MediaProcessWorker struct { river.WorkerDefaults[args.MediaProcessArgs] } func (j *MediaProcessWorker) Work(ctx context.Context, job *river.Job[args.MediaProcessArgs]) error { arg := job.Args // 1. Fetch Asset asset, err := models.MediaAssetQuery.WithContext(ctx).Where(models.MediaAssetQuery.ID.Eq(arg.AssetID)).First() if err != nil { return err } // 2. Mock Processing // Update status to processing _, err = models.MediaAssetQuery.WithContext(ctx).Where(models.MediaAssetQuery.ID.Eq(asset.ID)).UpdateSimple(models.MediaAssetQuery.Status.Value(consts.MediaAssetStatusProcessing)) if err != nil { return err } // 3. Update status to ready _, err = models.MediaAssetQuery.WithContext(ctx).Where(models.MediaAssetQuery.ID.Eq(asset.ID)).Updates(&models.MediaAsset{ Status: consts.MediaAssetStatusReady, UpdatedAt: time.Now(), }) return err }