feat: update video cut and extract head image job

This commit is contained in:
yanghao05
2025-04-22 10:37:57 +08:00
parent 326a9e523b
commit 284531d10e
15 changed files with 474 additions and 136 deletions

View File

@@ -93,5 +93,15 @@ func (w *DownloadFromAliOSSWorker) Work(ctx context.Context, job *Job[DownloadFr
log.Infof("Successfully downloaded file: %s", media.Path) log.Infof("Successfully downloaded file: %s", media.Path)
if err := w.job.Add(&VideoCut{MediaID: job.Args.MediaID}); err != nil {
log.Errorf("Error adding job: %v", err)
return err
}
if err := w.job.Add(&VideoExtractHeadImage{MediaID: job.Args.MediaID}); err != nil {
log.Errorf("Error adding job: %v", err)
return err
}
return nil return nil
} }

View File

@@ -1,79 +0,0 @@
package jobs
import (
"context"
"time"
"quyun/app/models"
"quyun/providers/ali"
"quyun/providers/app"
"quyun/providers/job"
. "github.com/riverqueue/river"
log "github.com/sirupsen/logrus"
_ "go.ipao.vip/atom"
"go.ipao.vip/atom/contracts"
)
var _ contracts.JobArgs = (*WechatCallback)(nil)
type ExtractHeadImageFromVideo struct {
MediaID int64 `json:"media_id"`
}
func (s ExtractHeadImageFromVideo) InsertOpts() InsertOpts {
return InsertOpts{
Queue: QueueDefault,
Priority: PriorityDefault,
}
}
func (s ExtractHeadImageFromVideo) Kind() string { return "extract_head_image_from_video" }
func (a ExtractHeadImageFromVideo) UniqueID() string { return a.Kind() }
var _ Worker[ExtractHeadImageFromVideo] = (*ExtractHeadImageFromVideoWorker)(nil)
// @provider(job)
type ExtractHeadImageFromVideoWorker struct {
WorkerDefaults[ExtractHeadImageFromVideo]
oss *ali.OSSClient
job *job.Job
app *app.Config
}
func (w *ExtractHeadImageFromVideoWorker) NextRetry(job *Job[ExtractHeadImageFromVideo]) time.Time {
return time.Now().Add(30 * time.Second)
}
func (w *ExtractHeadImageFromVideoWorker) Work(ctx context.Context, job *Job[ExtractHeadImageFromVideo]) error {
log := log.WithField("job", job.Args.Kind())
log.Infof("[Start] Working on job with strings: %+v", job.Args)
defer log.Infof("[End] Finished %s", job.Args.Kind())
media, err := models.Medias.GetByID(ctx, job.Args.MediaID)
if err != nil {
log.Errorf("Error getting media by ID: %v", err)
return JobCancel(err)
}
_ = media
// TODO
// path := "/Users/rogee/Projects/self/quyun/backend/fixtures/oss/"
// dst := filepath.Join(path, media.Path)
// // use ffmpeg to extract audio from video
// audioPath := filepath.Join(path, media.Hash+".mp3")
// cmd := exec.Command("ffmpeg", "-i", dst, audioPath)
// if err := cmd.Run(); err != nil {
// log.Errorf("Error extracting audio: %v", err)
// return err
// }
// log.Infof("Successfully extracted audio to: %s", audioPath)
return nil
}

View File

@@ -60,10 +60,12 @@ func Provide(opts ...opt.Option) error {
} }
if err := container.Container.Provide(func( if err := container.Container.Provide(func(
__job *job.Job, __job *job.Job,
app *app.Config,
job *job.Job, job *job.Job,
oss *ali.OSSClient, oss *ali.OSSClient,
) (contracts.Initial, error) { ) (contracts.Initial, error) {
obj := &ExtractAudioFromVideoWorker{ obj := &VideoCutWorker{
app: app,
job: job, job: job,
oss: oss, oss: oss,
} }
@@ -81,7 +83,26 @@ func Provide(opts ...opt.Option) error {
job *job.Job, job *job.Job,
oss *ali.OSSClient, oss *ali.OSSClient,
) (contracts.Initial, error) { ) (contracts.Initial, error) {
obj := &ExtractHeadImageFromVideoWorker{ obj := &VideoExtractHeadImageWorker{
app: app,
job: job,
oss: oss,
}
if err := river.AddWorkerSafely(__job.Workers, obj); err != nil {
return nil, err
}
return obj, nil
}, atom.GroupInitial); err != nil {
return err
}
if err := container.Container.Provide(func(
__job *job.Job,
app *app.Config,
job *job.Job,
oss *ali.OSSClient,
) (contracts.Initial, error) {
obj := &VideoStoreShortWorker{
app: app, app: app,
job: job, job: job,
oss: oss, oss: oss,

View File

@@ -1,13 +1,13 @@
package jobs package jobs
import ( import (
"bufio"
"context" "context"
"os/exec"
"path/filepath" "path/filepath"
"time" "time"
"quyun/app/models" "quyun/app/models"
"quyun/database/fields"
"quyun/pkg/utils"
"quyun/providers/ali" "quyun/providers/ali"
"quyun/providers/app" "quyun/providers/app"
"quyun/providers/job" "quyun/providers/job"
@@ -16,7 +16,6 @@ import (
log "github.com/sirupsen/logrus" log "github.com/sirupsen/logrus"
_ "go.ipao.vip/atom" _ "go.ipao.vip/atom"
"go.ipao.vip/atom/contracts" "go.ipao.vip/atom/contracts"
"golang.org/x/sync/errgroup"
) )
var _ contracts.JobArgs = (*VideoCut)(nil) var _ contracts.JobArgs = (*VideoCut)(nil)
@@ -63,53 +62,34 @@ func (w *VideoCutWorker) Work(ctx context.Context, job *Job[VideoCut]) error {
} }
input := filepath.Join(w.app.StoragePath, media.Path) input := filepath.Join(w.app.StoragePath, media.Path)
output := input[:len(input)-len(filepath.Ext(input))] + "-output" + filepath.Ext(input) output := input[:len(input)-len(filepath.Ext(input))] + "-short" + filepath.Ext(input)
log.Infof("cut video process %s to %s", input, output) log.Infof("cut video process %s to %s", input, output)
cmd := exec.Command("ffmpeg", "-ss", "00:00:00", "-i", input, "-to", "00:01:00", "-c", "copy", output)
stdout, err := cmd.StdoutPipe() if err := utils.CutMedia(input, output, 0, 60); err != nil {
log.Errorf("Error cutting media: %v", err)
return JobCancel(err)
}
duration, err := utils.GetMediaDuration(input)
if err != nil { if err != nil {
log.Errorf("Error creating stdout pipe: %v", err) log.Errorf("Error getting media duration: %v", err)
return err return JobCancel(err)
}
// update media metas
metas := fields.MediaMetas{
ParentHash: "",
Short: false,
Duration: duration,
}
if err := models.Medias.UpdateMetas(ctx, media.ID, metas); err != nil {
log.Errorf("Error updating media metas: %v", err)
return JobCancel(err)
} }
stderr, err := cmd.StderrPipe() // save to database
if err != nil { return w.job.Add(&VideoStoreShort{
log.Errorf("Error creating stderr pipe: %v", err) MediaID: media.ID,
return err FilePath: output,
}
if err := cmd.Start(); err != nil {
log.Errorf("Error starting command: %v", err)
return err
}
var eg errgroup.Group
eg.Go(func() error {
scanner := bufio.NewScanner(stdout)
for scanner.Scan() {
log.Info(scanner.Text())
}
return nil
}) })
eg.Go(func() error {
scanner := bufio.NewScanner(stderr)
for scanner.Scan() {
log.Error(scanner.Text())
}
return nil
})
if err := cmd.Wait(); err != nil {
log.Errorf("Error waiting for command: %v", err)
return err
}
if err := eg.Wait(); err != nil {
log.Errorf("Error waiting for command: %v", err)
return err
}
return nil
} }

View File

@@ -0,0 +1,111 @@
package jobs
import (
"context"
"os"
"path/filepath"
"time"
"quyun/app/models"
"quyun/database/fields"
"quyun/database/schemas/public/model"
"quyun/pkg/utils"
"quyun/providers/ali"
"quyun/providers/app"
"quyun/providers/job"
. "github.com/riverqueue/river"
log "github.com/sirupsen/logrus"
_ "go.ipao.vip/atom"
"go.ipao.vip/atom/contracts"
)
var _ contracts.JobArgs = (*VideoExtractHeadImage)(nil)
type VideoExtractHeadImage struct {
MediaID int64 `json:"media_id"`
}
func (s VideoExtractHeadImage) InsertOpts() InsertOpts {
return InsertOpts{
Queue: QueueDefault,
Priority: PriorityDefault,
}
}
func (s VideoExtractHeadImage) Kind() string { return "video_extract_head_image" }
func (a VideoExtractHeadImage) UniqueID() string { return a.Kind() }
var _ Worker[VideoExtractHeadImage] = (*VideoExtractHeadImageWorker)(nil)
// @provider(job)
type VideoExtractHeadImageWorker struct {
WorkerDefaults[VideoExtractHeadImage]
oss *ali.OSSClient
job *job.Job
app *app.Config
}
func (w *VideoExtractHeadImageWorker) NextRetry(job *Job[VideoExtractHeadImage]) time.Time {
return time.Now().Add(30 * time.Second)
}
func (w *VideoExtractHeadImageWorker) Work(ctx context.Context, job *Job[VideoExtractHeadImage]) error {
log := log.WithField("job", job.Args.Kind())
log.Infof("[Start] Working on job with strings: %+v", job.Args)
defer log.Infof("[End] Finished %s", job.Args.Kind())
media, err := models.Medias.GetByID(ctx, job.Args.MediaID)
if err != nil {
log.Errorf("Error getting media by ID: %v", err)
return JobCancel(err)
}
_ = media
input := filepath.Join(w.app.StoragePath, media.Path)
output := input[:len(input)-len(filepath.Ext(input))] + ".jpg"
if err := utils.GetFrameImageFromVideo(input, output, 1); err != nil {
log.Errorf("Error extracting image from video: %v", err)
return JobCancel(err)
}
defer os.RemoveAll(output)
// Upload the image to OSS
if err := w.oss.Upload(ctx, output, filepath.Base(output)); err != nil {
log.Errorf("Error uploading image to OSS: %v", err)
return JobCancel(err)
}
fileSize, err := utils.GetFileSize(output)
if err != nil {
log.Errorf("Error getting file size: %v", err)
return JobCancel(err)
}
fileMd5, err := utils.GetFileMd5(output)
if err != nil {
log.Errorf("Error getting file MD5: %v", err)
return JobCancel(err)
}
// create a new media record for the image
imageMedia := &model.Medias{
CreatedAt: time.Now(),
Name: "[展示图]" + media.Name,
MimeType: "image/jpeg",
Size: fileSize,
Path: w.oss.GetSavePath(filepath.Base(output)),
Hash: fileMd5,
Metas: fields.Json[fields.MediaMetas]{},
}
if err := models.Medias.Create(ctx, imageMedia); err != nil {
log.Errorf("Error creating media record: %v", err)
return JobCancel(err)
}
return nil
}

View File

@@ -0,0 +1,111 @@
package jobs
import (
"context"
"path/filepath"
"time"
"quyun/app/models"
"quyun/database/fields"
"quyun/database/schemas/public/model"
"quyun/pkg/utils"
"quyun/providers/ali"
"quyun/providers/app"
"quyun/providers/job"
. "github.com/riverqueue/river"
log "github.com/sirupsen/logrus"
_ "go.ipao.vip/atom"
"go.ipao.vip/atom/contracts"
)
var _ contracts.JobArgs = (*VideoStoreShort)(nil)
type VideoStoreShort struct {
MediaID int64 `json:"media_id"`
FilePath string `json:"file_path"`
}
func (s VideoStoreShort) InsertOpts() InsertOpts {
return InsertOpts{
Queue: QueueDefault,
Priority: PriorityDefault,
}
}
func (s VideoStoreShort) Kind() string { return "video_store_short" }
func (a VideoStoreShort) UniqueID() string { return a.Kind() }
var _ Worker[VideoStoreShort] = (*VideoStoreShortWorker)(nil)
// @provider(job)
type VideoStoreShortWorker struct {
WorkerDefaults[VideoStoreShort]
oss *ali.OSSClient
job *job.Job
app *app.Config
}
func (w *VideoStoreShortWorker) NextRetry(job *Job[VideoStoreShort]) time.Time {
return time.Now().Add(30 * time.Second)
}
func (w *VideoStoreShortWorker) Work(ctx context.Context, job *Job[VideoStoreShort]) error {
log := log.WithField("job", job.Args.Kind())
log.Infof("[Start] Working on job with strings: %+v", job.Args)
defer log.Infof("[End] Finished %s", job.Args.Kind())
media, err := models.Medias.GetByID(ctx, job.Args.MediaID)
if err != nil {
log.Errorf("Error getting media by ID: %v", err)
return JobCancel(err)
}
duration, err := utils.GetMediaDuration(job.Args.FilePath)
if err != nil {
log.Errorf("Error getting media duration: %v", err)
return JobCancel(err)
}
// get file md5
fileMd5, err := utils.GetFileMd5(job.Args.FilePath)
if err != nil {
log.Errorf("Error getting file md5: %v", err)
return JobCancel(err)
}
filePath := w.oss.GetSavePath(fileMd5 + filepath.Ext(job.Args.FilePath))
// get file size
fileSize, err := utils.GetFileSize(job.Args.FilePath)
if err != nil {
log.Errorf("Error getting file size: %v", err)
return JobCancel(err)
}
// save to db and relate to master
mediaModel := &model.Medias{
CreatedAt: time.Now(),
Name: "[试听]" + media.Name,
MimeType: media.MimeType,
Size: fileSize,
Path: filePath,
Hash: fileMd5,
Metas: fields.ToJson(fields.MediaMetas{
ParentHash: media.Hash,
Short: true,
Duration: duration,
}),
}
if err := models.Medias.Create(ctx, mediaModel); err != nil {
log.Errorf("Error saving media record: %v data: %+v", err, mediaModel)
return err
}
log.Infof("Media record created with path: %s and hash: %s", filePath, fileMd5)
return nil
}

View File

@@ -5,6 +5,7 @@ import (
"time" "time"
"quyun/app/requests" "quyun/app/requests"
"quyun/database/fields"
"quyun/database/schemas/public/model" "quyun/database/schemas/public/model"
"quyun/database/schemas/public/table" "quyun/database/schemas/public/table"
@@ -215,3 +216,23 @@ func (m *mediasModel) Delete(ctx context.Context, id int64) error {
m.log.Infof("media item deleted successfully") m.log.Infof("media item deleted successfully")
return nil return nil
} }
// UpdateMetas
func (m *mediasModel) UpdateMetas(ctx context.Context, id int64, metas fields.MediaMetas) error {
meta := fields.ToJson(metas)
tbl := table.Medias
stmt := tbl.
UPDATE(tbl.Metas).
SET(meta).
WHERE(tbl.ID.EQ(Int64(id)))
m.log.Infof("sql: %s", stmt.DebugSql())
if _, err := stmt.ExecContext(ctx, db); err != nil {
m.log.Errorf("error updating media metas: %v", err)
return err
}
m.log.Infof("media (%d) metas updated successfully", id)
return nil
}

View File

@@ -0,0 +1,7 @@
package fields
type MediaMetas struct {
ParentHash string `json:"parent_hash,omitempty"`
Short bool `json:"short,omitempty"`
Duration int64 `json:"duration,omitempty"`
}

View File

@@ -7,6 +7,7 @@ CREATE TABLE medias(
mime_type varchar(128) NOT NULL DEFAULT '', mime_type varchar(128) NOT NULL DEFAULT '',
size int8 NOT NULL DEFAULT 0, size int8 NOT NULL DEFAULT 0,
path varchar(255) NOT NULL DEFAULT '', path varchar(255) NOT NULL DEFAULT '',
metas jsonb NOT NULL DEFAULT '{}' ::jsonb,
hash varchar(64) NOT NULL DEFAULT '' hash varchar(64) NOT NULL DEFAULT ''
); );

View File

@@ -8,15 +8,17 @@
package model package model
import ( import (
"quyun/database/fields"
"time" "time"
) )
type Medias struct { type Medias struct {
ID int64 `sql:"primary_key" json:"id"` ID int64 `sql:"primary_key" json:"id"`
CreatedAt time.Time `json:"created_at"` CreatedAt time.Time `json:"created_at"`
Name string `json:"name"` Name string `json:"name"`
MimeType string `json:"mime_type"` MimeType string `json:"mime_type"`
Size int64 `json:"size"` Size int64 `json:"size"`
Path string `json:"path"` Path string `json:"path"`
Hash string `json:"hash"` Hash string `json:"hash"`
Metas fields.Json[fields.MediaMetas] `json:"metas"`
} }

View File

@@ -24,6 +24,7 @@ type mediasTable struct {
Size postgres.ColumnInteger Size postgres.ColumnInteger
Path postgres.ColumnString Path postgres.ColumnString
Hash postgres.ColumnString Hash postgres.ColumnString
Metas postgres.ColumnString
AllColumns postgres.ColumnList AllColumns postgres.ColumnList
MutableColumns postgres.ColumnList MutableColumns postgres.ColumnList
@@ -71,8 +72,9 @@ func newMediasTableImpl(schemaName, tableName, alias string) mediasTable {
SizeColumn = postgres.IntegerColumn("size") SizeColumn = postgres.IntegerColumn("size")
PathColumn = postgres.StringColumn("path") PathColumn = postgres.StringColumn("path")
HashColumn = postgres.StringColumn("hash") HashColumn = postgres.StringColumn("hash")
allColumns = postgres.ColumnList{IDColumn, CreatedAtColumn, NameColumn, MimeTypeColumn, SizeColumn, PathColumn, HashColumn} MetasColumn = postgres.StringColumn("metas")
mutableColumns = postgres.ColumnList{CreatedAtColumn, NameColumn, MimeTypeColumn, SizeColumn, PathColumn, HashColumn} allColumns = postgres.ColumnList{IDColumn, CreatedAtColumn, NameColumn, MimeTypeColumn, SizeColumn, PathColumn, HashColumn, MetasColumn}
mutableColumns = postgres.ColumnList{CreatedAtColumn, NameColumn, MimeTypeColumn, SizeColumn, PathColumn, HashColumn, MetasColumn}
) )
return mediasTable{ return mediasTable{
@@ -86,6 +88,7 @@ func newMediasTableImpl(schemaName, tableName, alias string) mediasTable {
Size: SizeColumn, Size: SizeColumn,
Path: PathColumn, Path: PathColumn,
Hash: HashColumn, Hash: HashColumn,
Metas: MetasColumn,
AllColumns: allColumns, AllColumns: allColumns,
MutableColumns: mutableColumns, MutableColumns: mutableColumns,

View File

@@ -25,3 +25,6 @@ types:
orders: orders:
status: OrderStatus status: OrderStatus
meta: Json[OrderMeta] meta: Json[OrderMeta]
medias:
metas: Json[MediaMetas]

120
backend/pkg/utils/ffmpeg.go Normal file
View File

@@ -0,0 +1,120 @@
package utils
import (
"bufio"
"context"
"os/exec"
"strconv"
"strings"
"github.com/go-pay/errgroup"
"github.com/pkg/errors"
log "github.com/sirupsen/logrus"
)
func GetMediaDuration(path string) (int64, error) {
// use ffprobe to get media duration
// ffprobe -v error -show_entries format=duration -of default=noprint_wrappers=1:nokey=1 <file>
cmd := exec.Command("ffprobe", "-v", "error", "-show_entries", "format=duration", "-of", "default=noprint_wrappers=1:nokey=1", path)
durationOutput, err := cmd.Output()
if err != nil {
return 0, errors.Wrap(err, "ffprobe error")
}
duration := string(durationOutput)
duration = strings.TrimSpace(duration)
durationInt, err := strconv.Atoi(duration)
if err != nil {
return 0, errors.Wrap(err, "duration conversion error")
}
return int64(durationInt), nil
}
func CutMedia(input, output string, start, end int64) error {
// ffmpeg -ss 00:00:00 -i input.mp4 -to 00:01:00 -c copy output.mp4
cmd := exec.Command("ffmpeg", "-ss", strconv.FormatInt(start, 10), "-i", input, "-t", strconv.FormatInt(end, 10), "-c", "copy", output)
stdout, err := cmd.StdoutPipe()
if err != nil {
log.Errorf("Error creating stdout pipe: %v", err)
return err
}
stderr, err := cmd.StderrPipe()
if err != nil {
log.Errorf("Error creating stderr pipe: %v", err)
return err
}
if err := cmd.Start(); err != nil {
log.Errorf("Error starting command: %v", err)
return err
}
var eg errgroup.Group
eg.Go(func(ctx context.Context) error {
scanner := bufio.NewScanner(stdout)
for scanner.Scan() {
log.Info(scanner.Text())
}
return nil
})
eg.Go(func(ctx context.Context) error {
scanner := bufio.NewScanner(stderr)
for scanner.Scan() {
log.Error(scanner.Text())
}
return nil
})
if err := cmd.Wait(); err != nil {
log.Errorf("Error waiting for command: %v", err)
return err
}
if err := eg.Wait(); err != nil {
log.Errorf("Error waiting for command: %v", err)
return err
}
return nil
}
// GetFrameImageFromVideo extracts target time frame from a video file and saves it as an image.
func GetFrameImageFromVideo(input, output string, time int64) error {
// ffmpeg -i input.mp4 -ss 00:00:01 -vframes 1 output.jpg
cmd := exec.Command("ffmpeg", "-i", input, "-ss", strconv.FormatInt(time, 10), "-vframes", "1", output)
stdout, err := cmd.StdoutPipe()
if err != nil {
return errors.Wrap(err, "stdout pipe error")
}
stderr, err := cmd.StderrPipe()
if err != nil {
return errors.Wrap(err, "stderr pipe error")
}
if err := cmd.Start(); err != nil {
return errors.Wrap(err, "command start error")
}
var eg errgroup.Group
eg.Go(func(ctx context.Context) error {
scanner := bufio.NewScanner(stdout)
for scanner.Scan() {
log.Info(scanner.Text())
}
return nil
})
eg.Go(func(ctx context.Context) error {
scanner := bufio.NewScanner(stderr)
for scanner.Scan() {
log.Error(scanner.Text())
}
return nil
})
if err := cmd.Wait(); err != nil {
return errors.Wrap(err, "command wait error")
}
if err := eg.Wait(); err != nil {
return errors.Wrap(err, "command wait error")
}
return nil
}

View File

@@ -31,3 +31,12 @@ func GetFileMd5(file string) (string, error) {
return fmt.Sprintf("%x", h.Sum(nil)), nil return fmt.Sprintf("%x", h.Sum(nil)), nil
} }
// GetFileSize
func GetFileSize(file string) (int64, error) {
fi, err := os.Stat(file)
if err != nil {
return 0, err
}
return fi.Size(), nil
}

View File

@@ -2,6 +2,7 @@ package ali
import ( import (
"context" "context"
"path/filepath"
"strings" "strings"
"time" "time"
@@ -14,6 +15,10 @@ type OSSClient struct {
config *Config config *Config
} }
func (c *OSSClient) GetSavePath(path string) string {
return filepath.Join("quyun", strings.Trim(path, "/"))
}
func (c *OSSClient) GetClient() *oss.Client { func (c *OSSClient) GetClient() *oss.Client {
return c.client return c.client
} }
@@ -21,7 +26,7 @@ func (c *OSSClient) GetClient() *oss.Client {
func (c *OSSClient) PreSignUpload(ctx context.Context, path, mimeType string) (*oss.PresignResult, error) { func (c *OSSClient) PreSignUpload(ctx context.Context, path, mimeType string) (*oss.PresignResult, error) {
request := &oss.PutObjectRequest{ request := &oss.PutObjectRequest{
Bucket: oss.Ptr(c.config.Bucket), Bucket: oss.Ptr(c.config.Bucket),
Key: oss.Ptr("quyun/" + strings.Trim(path, "/")), Key: oss.Ptr(c.GetSavePath(path)),
ContentType: oss.Ptr(mimeType), ContentType: oss.Ptr(mimeType),
} }
return c.client.Presign(ctx, request) return c.client.Presign(ctx, request)
@@ -67,3 +72,16 @@ func (c *OSSClient) Delete(ctx context.Context, path string) error {
} }
return nil return nil
} }
// Upload
func (c *OSSClient) Upload(ctx context.Context, input, dst string) error {
request := &oss.PutObjectRequest{
Bucket: oss.Ptr(c.config.Bucket),
Key: oss.Ptr(c.GetSavePath(dst)),
}
if _, err := c.internalClient.PutObjectFromFile(ctx, request, input); err != nil {
return err
}
return nil
}