From 7ebbc01ae2d4e067c20a1c5310458ab38ee01e63 Mon Sep 17 00:00:00 2001 From: yanghao05 Date: Thu, 17 Apr 2025 20:57:31 +0800 Subject: [PATCH] feat: add video processing jobs --- backend/app/jobs/download_from_alioss.go | 2 + backend/app/jobs/download_from_alioss_test.go | 1 + backend/app/jobs/extract_audio_from_video.go | 77 +++++++++++++++++++ .../app/jobs/extract_head_image_from_video.go | 77 +++++++++++++++++++ 4 files changed, 157 insertions(+) create mode 100644 backend/app/jobs/extract_audio_from_video.go create mode 100644 backend/app/jobs/extract_head_image_from_video.go diff --git a/backend/app/jobs/download_from_alioss.go b/backend/app/jobs/download_from_alioss.go index 83b489a..fd96c69 100644 --- a/backend/app/jobs/download_from_alioss.go +++ b/backend/app/jobs/download_from_alioss.go @@ -9,6 +9,7 @@ import ( "quyun/app/models" "quyun/providers/ali" + "quyun/providers/job" . "github.com/riverqueue/river" log "github.com/sirupsen/logrus" @@ -40,6 +41,7 @@ type DownloadFromAliOSSWorker struct { WorkerDefaults[DownloadFromAliOSS] oss *ali.OSSClient + job *job.Job } func (w *DownloadFromAliOSSWorker) NextRetry(job *Job[DownloadFromAliOSS]) time.Time { diff --git a/backend/app/jobs/download_from_alioss_test.go b/backend/app/jobs/download_from_alioss_test.go index f4fba77..123c578 100644 --- a/backend/app/jobs/download_from_alioss_test.go +++ b/backend/app/jobs/download_from_alioss_test.go @@ -50,6 +50,7 @@ func (t *DownloadFromAliOSSSuite) Test_Work() { worker := &DownloadFromAliOSSWorker{ oss: t.Oss, + job: t.Job, } err := worker.Work(context.Background(), job) diff --git a/backend/app/jobs/extract_audio_from_video.go b/backend/app/jobs/extract_audio_from_video.go new file mode 100644 index 0000000..0390e08 --- /dev/null +++ b/backend/app/jobs/extract_audio_from_video.go @@ -0,0 +1,77 @@ +package jobs + +import ( + "context" + "time" + + "quyun/app/models" + "quyun/providers/ali" + "quyun/providers/job" + + . "github.com/riverqueue/river" + log "github.com/sirupsen/logrus" + _ "go.ipao.vip/atom" + "go.ipao.vip/atom/contracts" +) + +var _ contracts.JobArgs = (*WechatCallback)(nil) + +type ExtractAudioFromVideo struct { + MediaID int64 `json:"media_id"` +} + +func (s ExtractAudioFromVideo) InsertOpts() InsertOpts { + return InsertOpts{ + Queue: QueueDefault, + Priority: PriorityDefault, + } +} + +func (s ExtractAudioFromVideo) Kind() string { return "extract_audio_from_video" } +func (a ExtractAudioFromVideo) UniqueID() string { return a.Kind() } + +var _ Worker[ExtractAudioFromVideo] = (*ExtractAudioFromVideoWorker)(nil) + +// @provider(job) +type ExtractAudioFromVideoWorker struct { + WorkerDefaults[ExtractAudioFromVideo] + + oss *ali.OSSClient + job *job.Job +} + +func (w *ExtractAudioFromVideoWorker) NextRetry(job *Job[ExtractAudioFromVideo]) time.Time { + return time.Now().Add(30 * time.Second) +} + +func (w *ExtractAudioFromVideoWorker) Work(ctx context.Context, job *Job[ExtractAudioFromVideo]) error { + log := log.WithField("job", job.Args.Kind()) + + log.Infof("[Start] Working on job with strings: %+v", job.Args) + defer log.Infof("[End] Finished %s", job.Args.Kind()) + + media, err := models.Medias.GetByID(ctx, job.Args.MediaID) + if err != nil { + log.Errorf("Error getting media by ID: %v", err) + return JobCancel(err) + } + _ = media + + // TODO + + // path := "/Users/rogee/Projects/self/quyun/backend/fixtures/oss/" + // dst := filepath.Join(path, media.Path) + + // // use ffmpeg to extract audio from video + // audioPath := filepath.Join(path, media.Hash+".mp3") + + // cmd := exec.Command("ffmpeg", "-i", dst, audioPath) + // if err := cmd.Run(); err != nil { + // log.Errorf("Error extracting audio: %v", err) + // return err + // } + + // log.Infof("Successfully extracted audio to: %s", audioPath) + + return nil +} diff --git a/backend/app/jobs/extract_head_image_from_video.go b/backend/app/jobs/extract_head_image_from_video.go new file mode 100644 index 0000000..a82455c --- /dev/null +++ b/backend/app/jobs/extract_head_image_from_video.go @@ -0,0 +1,77 @@ +package jobs + +import ( + "context" + "time" + + "quyun/app/models" + "quyun/providers/ali" + "quyun/providers/job" + + . "github.com/riverqueue/river" + log "github.com/sirupsen/logrus" + _ "go.ipao.vip/atom" + "go.ipao.vip/atom/contracts" +) + +var _ contracts.JobArgs = (*WechatCallback)(nil) + +type ExtractHeadImageFromVideo struct { + MediaID int64 `json:"media_id"` +} + +func (s ExtractHeadImageFromVideo) InsertOpts() InsertOpts { + return InsertOpts{ + Queue: QueueDefault, + Priority: PriorityDefault, + } +} + +func (s ExtractHeadImageFromVideo) Kind() string { return "extract_head_image_from_video" } +func (a ExtractHeadImageFromVideo) UniqueID() string { return a.Kind() } + +var _ Worker[ExtractHeadImageFromVideo] = (*ExtractHeadImageFromVideoWorker)(nil) + +// @provider(job) +type ExtractHeadImageFromVideoWorker struct { + WorkerDefaults[ExtractHeadImageFromVideo] + + oss *ali.OSSClient + job *job.Job +} + +func (w *ExtractHeadImageFromVideoWorker) NextRetry(job *Job[ExtractHeadImageFromVideo]) time.Time { + return time.Now().Add(30 * time.Second) +} + +func (w *ExtractHeadImageFromVideoWorker) Work(ctx context.Context, job *Job[ExtractHeadImageFromVideo]) error { + log := log.WithField("job", job.Args.Kind()) + + log.Infof("[Start] Working on job with strings: %+v", job.Args) + defer log.Infof("[End] Finished %s", job.Args.Kind()) + + media, err := models.Medias.GetByID(ctx, job.Args.MediaID) + if err != nil { + log.Errorf("Error getting media by ID: %v", err) + return JobCancel(err) + } + _ = media + + // TODO + + // path := "/Users/rogee/Projects/self/quyun/backend/fixtures/oss/" + // dst := filepath.Join(path, media.Path) + + // // use ffmpeg to extract audio from video + // audioPath := filepath.Join(path, media.Hash+".mp3") + + // cmd := exec.Command("ffmpeg", "-i", dst, audioPath) + // if err := cmd.Run(); err != nil { + // log.Errorf("Error extracting audio: %v", err) + // return err + // } + + // log.Infof("Successfully extracted audio to: %s", audioPath) + + return nil +}