package jobs import ( "bytes" "context" "os" "os/exec" "strings" "time" "backend/app/http/medias" "backend/app/http/posts" "backend/app/http/storages" "backend/database/fields" "backend/database/models/qvyun_v2/public/model" "backend/pkg/utils" "backend/pkg/utils/fs" _ "git.ipao.vip/rogeecn/atom" _ "git.ipao.vip/rogeecn/atom/contracts" "github.com/pkg/errors" . "github.com/riverqueue/river" "github.com/samber/lo" "github.com/sirupsen/logrus" ) var ( _ JobArgs = (*PostVideoExtractAudioJob)(nil) _ JobArgsWithInsertOpts = (*PostVideoExtractAudioJob)(nil) ) type PostVideoExtractAudioJob struct { PostID int64 TenantID int64 UserID int64 Hash string Mark string } // InsertOpts implements JobArgsWithInsertOpts. func (s PostVideoExtractAudioJob) InsertOpts() InsertOpts { return InsertOpts{ Queue: QueueDefault, Priority: PriorityDefault, UniqueOpts: UniqueOpts{ ByArgs: true, }, } } func (PostVideoExtractAudioJob) Kind() string { return "PostVideoExtractAudioJob" } var _ Worker[PostVideoExtractAudioJob] = (*PostVideoExtractAudioJobWorker)(nil) // @provider(job) type PostVideoExtractAudioJobWorker struct { WorkerDefaults[PostVideoExtractAudioJob] log *logrus.Entry `inject:"false"` postSvc *posts.Service mediaSvc *medias.Service storageSvc *storages.Service } func (w *PostVideoExtractAudioJobWorker) Prepare() error { w.log = logrus.WithField("worker", "PostVideoExtractAudioJobWorker") return nil } func (w *PostVideoExtractAudioJobWorker) NextRetry(job *Job[PostVideoExtractAudioJob]) time.Time { return time.Now().Add(5 * time.Second) } func (w *PostVideoExtractAudioJobWorker) Work(ctx context.Context, job *Job[PostVideoExtractAudioJob]) error { post, err := w.postSvc.GetPostByID(ctx, job.Args.PostID) if err != nil { return errors.Wrapf(err, "get post(%d) failed", job.Args.PostID) } media, err := w.mediaSvc.GetMediaByHash(ctx, job.Args.TenantID, job.Args.UserID, job.Args.Hash) if err != nil { return errors.Wrapf(err, "get media by hash(%s) failed", job.Args.Hash) } videoPath := media.Path // 获取全长度的音频 _, ok := lo.Find(post.Assets.Data, func(asset fields.MediaAsset) bool { return asset.Type == fields.MediaAssetTypeAudio && asset.Mark != nil && *asset.Mark == job.Args.Mark }) if ok { return nil } audioPath := strings.Replace(videoPath, ".mp4", ".mp3", -1) var duration *string if job.Args.Mark == "audio-preview" { duration = lo.ToPtr("00:01:00") } if err := w.extractAudioFromVideo(ctx, videoPath, audioPath, duration); err != nil { return errors.Wrapf(err, "extract audio from video failed") } fileMd5, err := utils.FileMd5(audioPath) if err != nil { return errors.Wrapf(err, "get audio(%s) file md5 failed", audioPath) } if err := os.Rename(audioPath, strings.Replace(audioPath, job.Args.Hash, fileMd5, 1)); err != nil { return errors.Wrapf(err, "rename audio(%s) file failed", audioPath) } storage, err := w.storageSvc.GetDefault(ctx) if err != nil { return err } // save to medias _, err = w.mediaSvc.Create(ctx, &model.Medias{ TenantID: job.Args.TenantID, UserID: job.Args.UserID, PostID: post.ID, StorageID: storage.ID, Hash: fileMd5, Name: post.Title, MimeType: "audio/mp3", Size: fs.FileSize(audioPath), Path: audioPath, }) if err != nil { return errors.Wrapf(err, "create media failed") } assets := []fields.MediaAsset{ { Type: fields.MediaAssetTypeAudio, Hash: fileMd5, Mark: lo.ToPtr(job.Args.Mark), }, } if err := w.postSvc.AttachAssets(ctx, job.Args.TenantID, job.Args.UserID, post.ID, assets); err != nil { return errors.Wrapf(err, "attach audio(%s) to post(%d) failed", audioPath, post.ID) } if job.Args.Mark == "audio-preview" { post.Meta.WorkerMark = post.Meta.WorkerMark & 1 << 1 } else if job.Args.Mark == "audio" { post.Meta.WorkerMark = post.Meta.WorkerMark & 1 << 2 } if err := w.postSvc.UpdateMeta(ctx, job.Args.TenantID, job.Args.UserID, post.ID, post.Meta); err != nil { return errors.Wrapf(err, "update post(%d) meta failed", post.ID) } return nil } // extractAudioFromVideo extracts audio from video. func (w *PostVideoExtractAudioJobWorker) extractAudioFromVideo(ctx context.Context, videoPath, audioPath string, duration *string) error { args := []string{ "-i", videoPath, "-vn", "-acodec", "libmp3lame", "-ar", "44100", "-b:a", "128k", "-q:a", "2", } if duration != nil { args = append(args, "-t", *duration) } args = append(args, audioPath) w.log.Infof("extractAudioFromVideo: ffmpeg %s", strings.Join(args, " ")) cmd := exec.CommandContext(ctx, "ffmpeg", args...) var buf bytes.Buffer logWriter := utils.NewLogBuffer(func(line string) { w.log.Info(line) }) cmd.Stdout = utils.NewCombinedBuffer(&buf, logWriter) cmd.Stderr = utils.NewCombinedBuffer(&buf, logWriter) if err := cmd.Run(); err != nil { return errors.Wrapf(err, "extract audio failed: %s\n%s", videoPath, buf.String()) } return nil }