feat: support S3 media processing pipeline

This commit is contained in:
2026-02-04 19:15:44 +08:00
parent 8f7000dc8d
commit 57b7269215
4 changed files with 366 additions and 17 deletions

View File

@@ -114,7 +114,49 @@ func (j *MediaProcessWorker) Work(ctx context.Context, job *river.Job[args.Media
}
}
} else {
log.Warn("non-local provider, skipping ffmpeg processing")
tempDir, err := os.MkdirTemp("", "media-process-")
if err != nil {
log.Errorf("create temp dir failed: %v", err)
finalStatus = consts.MediaAssetStatusFailed
} else {
defer os.RemoveAll(tempDir)
ext := path.Ext(asset.ObjectKey)
inputFile := filepath.Join(tempDir, "source"+ext)
if err := j.storage.Download(ctx, asset.ObjectKey, inputFile); err != nil {
log.Errorf("download media file failed: %s, err=%v", asset.ObjectKey, err)
finalStatus = consts.MediaAssetStatusFailed
} else if _, err := exec.LookPath("ffmpeg"); err != nil {
log.Warn("ffmpeg not found, skipping real transcoding")
} else {
coverFile := filepath.Join(tempDir, "cover.jpg")
cmd := exec.CommandContext(
ctx,
"ffmpeg",
"-y",
"-i",
inputFile,
"-ss",
"00:00:00.000",
"-vframes",
"1",
"-vf",
"format=yuv420p",
"-update",
"1",
coverFile,
)
if out, err := cmd.CombinedOutput(); err != nil {
log.Errorf("ffmpeg failed: %s, output: %s", err, string(out))
finalStatus = consts.MediaAssetStatusFailed
} else {
log.Infof("Generated cover: %s", coverFile)
if err := j.registerCoverAsset(ctx, asset, coverFile); err != nil {
log.Errorf("register cover failed: %s", err)
finalStatus = consts.MediaAssetStatusFailed
}
}
}
}
}
}
@@ -175,21 +217,27 @@ func (j *MediaProcessWorker) registerCoverAsset(ctx context.Context, asset *mode
coverName := coverFilename(filename)
objectKey := buildObjectKey(tenant, hash, coverName)
// 本地存储将文件移动到目标 objectKey 位置,保持路径规范。
localPath := j.storage.Config.LocalPath
if localPath == "" {
localPath = "./storage"
}
dstPath := filepath.Join(localPath, filepath.FromSlash(objectKey))
if coverFile != dstPath {
if err := os.MkdirAll(filepath.Dir(dstPath), 0o755); err != nil {
return err
}
if _, err := os.Stat(dstPath); err == nil {
_ = os.Remove(coverFile)
} else if err := os.Rename(coverFile, dstPath); err != nil {
if strings.ToLower(asset.Provider) == "local" {
localPath := j.storage.Config.LocalPath
if localPath == "" {
localPath = "./storage"
}
dstPath := filepath.Join(localPath, filepath.FromSlash(objectKey))
if coverFile != dstPath {
if err := os.MkdirAll(filepath.Dir(dstPath), 0o755); err != nil {
return err
}
if _, err := os.Stat(dstPath); err == nil {
_ = os.Remove(coverFile)
} else if err := os.Rename(coverFile, dstPath); err != nil {
return err
}
}
} else {
if err := j.storage.PutObject(ctx, objectKey, coverFile, "image/jpeg"); err != nil {
return err
}
_ = os.Remove(coverFile)
}
coverAsset := &models.MediaAsset{

View File

@@ -0,0 +1,257 @@
package jobs
import (
"database/sql"
"io"
"os"
"os/exec"
"path"
"path/filepath"
"testing"
"quyun/v2/app/commands/testx"
"quyun/v2/app/jobs/args"
"quyun/v2/database"
"quyun/v2/database/fields"
"quyun/v2/database/models"
"quyun/v2/pkg/consts"
"quyun/v2/providers/storage"
"github.com/riverqueue/river"
"github.com/rogeecn/fabfile"
. "github.com/smartystreets/goconvey/convey"
"github.com/stretchr/testify/suite"
"go.ipao.vip/atom/contracts"
"go.ipao.vip/gen/types"
"go.uber.org/dig"
)
type MediaProcessWorkerTestSuiteInjectParams struct {
dig.In
DB *sql.DB
Initials []contracts.Initial `group:"initials"`
Storage *storage.Storage
}
type MediaProcessWorkerLocalSuite struct {
suite.Suite
MediaProcessWorkerTestSuiteInjectParams
}
type MediaProcessWorkerS3Suite struct {
suite.Suite
MediaProcessWorkerTestSuiteInjectParams
}
func Test_MediaProcessWorkerLocal(t *testing.T) {
originEnv := os.Getenv("ENV_LOCAL")
if err := os.Setenv("ENV_LOCAL", "test"); err != nil {
t.Fatalf("set ENV_LOCAL failed: %v", err)
}
t.Cleanup(func() {
if originEnv == "" {
_ = os.Unsetenv("ENV_LOCAL")
} else {
_ = os.Setenv("ENV_LOCAL", originEnv)
}
})
providers := testx.Default()
testx.Serve(providers, t, func(p MediaProcessWorkerTestSuiteInjectParams) {
suite.Run(t, &MediaProcessWorkerLocalSuite{MediaProcessWorkerTestSuiteInjectParams: p})
})
}
func Test_MediaProcessWorkerS3(t *testing.T) {
originEnv := os.Getenv("ENV_LOCAL")
if err := os.Setenv("ENV_LOCAL", "minio"); err != nil {
t.Fatalf("set ENV_LOCAL failed: %v", err)
}
t.Cleanup(func() {
if originEnv == "" {
_ = os.Unsetenv("ENV_LOCAL")
} else {
_ = os.Setenv("ENV_LOCAL", originEnv)
}
})
providers := testx.Default()
testx.Serve(providers, t, func(p MediaProcessWorkerTestSuiteInjectParams) {
suite.Run(t, &MediaProcessWorkerS3Suite{MediaProcessWorkerTestSuiteInjectParams: p})
})
}
func (s *MediaProcessWorkerLocalSuite) Test_Work_Local() {
Convey("Work Local", s.T(), func() {
if _, err := exec.LookPath("ffmpeg"); err != nil {
s.T().Skip("ffmpeg not installed")
}
ctx := s.T().Context()
database.Truncate(ctx, s.DB, models.TableNameMediaAsset)
tempDir := s.T().TempDir()
fixturePath := fabfile.MustFind("fixtures/demo.mp4")
objectKey := path.Join("quyun", "public", "demo.mp4")
dstPath := filepath.Join(tempDir, filepath.FromSlash(objectKey))
So(copyFile(fixturePath, dstPath), ShouldBeNil)
info, err := os.Stat(dstPath)
So(err, ShouldBeNil)
So(s.Storage, ShouldNotBeNil)
s.Storage.Config.Type = "local"
s.Storage.Config.LocalPath = tempDir
asset := &models.MediaAsset{
TenantID: 1,
UserID: 1,
Type: consts.MediaAssetTypeVideo,
Status: consts.MediaAssetStatusUploaded,
Provider: s.Storage.Provider(),
Bucket: s.Storage.Bucket(),
ObjectKey: objectKey,
Meta: types.NewJSONType(fields.MediaAssetMeta{
Filename: "demo.mp4",
Size: info.Size(),
}),
}
So(models.MediaAssetQuery.WithContext(ctx).Create(asset), ShouldBeNil)
worker := &MediaProcessWorker{storage: s.Storage}
err = worker.Work(ctx, &river.Job[args.MediaAssetProcessJob]{
Args: args.MediaAssetProcessJob{
TenantID: asset.TenantID,
AssetID: asset.ID,
},
})
So(err, ShouldBeNil)
updated, err := models.MediaAssetQuery.WithContext(ctx).
Where(models.MediaAssetQuery.ID.Eq(asset.ID)).
First()
So(err, ShouldBeNil)
So(updated.Status, ShouldEqual, consts.MediaAssetStatusReady)
tbl, q := models.MediaAssetQuery.QueryContext(ctx)
cover, err := q.Where(
tbl.SourceAssetID.Eq(asset.ID),
tbl.Type.Eq(consts.MediaAssetTypeImage),
).First()
So(err, ShouldBeNil)
So(cover.ObjectKey, ShouldNotBeBlank)
coverPath := filepath.Join(tempDir, filepath.FromSlash(cover.ObjectKey))
_, err = os.Stat(coverPath)
So(err, ShouldBeNil)
})
}
func (s *MediaProcessWorkerS3Suite) Test_Work_S3() {
Convey("Work S3", s.T(), func() {
if _, err := exec.LookPath("ffmpeg"); err != nil {
s.T().Skip("ffmpeg not installed")
}
ctx := s.T().Context()
database.Truncate(ctx, s.DB, models.TableNameMediaAsset)
So(s.Storage, ShouldNotBeNil)
s.Storage.Config.Type = "s3"
s.Storage.Config.Endpoint = "http://127.0.0.1:9000"
s.Storage.Config.AccessKey = "minioadmin"
s.Storage.Config.SecretKey = "minioadmin"
s.Storage.Config.Region = "us-east-1"
s.Storage.Config.Bucket = "quyun-assets"
s.Storage.Config.PathStyle = true
fixturePath := fabfile.MustFind("fixtures/demo.mp4")
objectKey := path.Join("quyun", "public", "demo.mp4")
err := s.Storage.PutObject(ctx, objectKey, fixturePath, "video/mp4")
if err != nil {
s.T().Skipf("minio not available: %v", err)
}
s.T().Cleanup(func() {
_ = s.Storage.Delete(objectKey)
})
info, err := os.Stat(fixturePath)
So(err, ShouldBeNil)
asset := &models.MediaAsset{
TenantID: 1,
UserID: 1,
Type: consts.MediaAssetTypeVideo,
Status: consts.MediaAssetStatusUploaded,
Provider: "s3",
Bucket: s.Storage.Config.Bucket,
ObjectKey: objectKey,
Meta: types.NewJSONType(fields.MediaAssetMeta{
Filename: "demo.mp4",
Size: info.Size(),
}),
}
So(models.MediaAssetQuery.WithContext(ctx).Create(asset), ShouldBeNil)
worker := &MediaProcessWorker{storage: s.Storage}
err = worker.Work(ctx, &river.Job[args.MediaAssetProcessJob]{
Args: args.MediaAssetProcessJob{
TenantID: asset.TenantID,
AssetID: asset.ID,
},
})
So(err, ShouldBeNil)
updated, err := models.MediaAssetQuery.WithContext(ctx).
Where(models.MediaAssetQuery.ID.Eq(asset.ID)).
First()
So(err, ShouldBeNil)
So(updated.Status, ShouldEqual, consts.MediaAssetStatusReady)
tbl, q := models.MediaAssetQuery.QueryContext(ctx)
cover, err := q.Where(
tbl.SourceAssetID.Eq(asset.ID),
tbl.Type.Eq(consts.MediaAssetTypeImage),
).First()
So(err, ShouldBeNil)
So(cover.ObjectKey, ShouldNotBeBlank)
So(cover.Provider, ShouldEqual, "s3")
s.T().Cleanup(func() {
if cover.ObjectKey != "" {
_ = s.Storage.Delete(cover.ObjectKey)
}
})
downloadDir := s.T().TempDir()
downloadPath := filepath.Join(downloadDir, "cover.jpg")
So(s.Storage.Download(ctx, cover.ObjectKey, downloadPath), ShouldBeNil)
downloadInfo, err := os.Stat(downloadPath)
So(err, ShouldBeNil)
So(downloadInfo.Size(), ShouldBeGreaterThan, 0)
})
}
func copyFile(srcPath, dstPath string) error {
if err := os.MkdirAll(filepath.Dir(dstPath), 0o755); err != nil {
return err
}
src, err := os.Open(srcPath)
if err != nil {
return err
}
defer src.Close()
dst, err := os.Create(dstPath)
if err != nil {
return err
}
defer dst.Close()
if _, err := io.Copy(dst, src); err != nil {
return err
}
return nil
}

View File

@@ -6,6 +6,7 @@ import (
"crypto/sha256"
"encoding/hex"
"fmt"
"io"
"net/url"
"os"
"path/filepath"
@@ -63,6 +64,44 @@ type Storage struct {
s3Client *minio.Client
}
func (s *Storage) Download(ctx context.Context, key, filePath string) error {
if s.storageType() == "local" {
localPath := s.Config.LocalPath
if localPath == "" {
localPath = "./storage"
}
srcPath := filepath.Join(localPath, key)
if err := os.MkdirAll(filepath.Dir(filePath), 0o755); err != nil {
return err
}
src, err := os.Open(srcPath)
if err != nil {
return err
}
defer src.Close()
dst, err := os.Create(filePath)
if err != nil {
return err
}
defer dst.Close()
if _, err := io.Copy(dst, src); err != nil {
return err
}
return nil
}
client, err := s.s3ClientForUse()
if err != nil {
return err
}
if err := os.MkdirAll(filepath.Dir(filePath), 0o755); err != nil {
return err
}
return client.FGetObject(ctx, s.Config.Bucket, key, filePath, minio.GetObjectOptions{})
}
func (s *Storage) Delete(key string) error {
if s.storageType() == "local" {
localPath := s.Config.LocalPath

View File

@@ -348,7 +348,7 @@
- ✅ MinIO S3 Provider 配置可用(`config.prod.toml`
- ✅ 上传/访问/删除完整链路验证通过
### 17) 媒体处理管线适配对象存储S3/MinIO
### 17) 媒体处理管线适配对象存储S3/MinIO(已完成)
**需求目标**
- 在对象存储模式下,媒体处理任务可完整执行并回传产物。
@@ -356,10 +356,14 @@
- Worker从对象存储下载源文件到临时目录 → FFmpeg 处理 → 结果上传回对象存储 → 清理临时文件。
- 产物:封面/预览片段自动生成并回写 `media_assets`
- 本地 FS 仍保留兼容路径(开发/测试使用)。
- 进度本地视频处理已可生成封面资产ffmpeg 可用时)。
- 已完成:
- `Storage.Download` 支持 local copy + S3 FGetObject
- `MediaProcessWorker` 支持对象存储流程(下载 → FFmpeg → 上传封面 → 清理)
- 封面派生资产在 S3 模式走 `PutObject`(不再 rename
**测试方案**
- 对象存储模式下上传视频触发处理,封面/预览可访问;任务异常可重试。
- `ENV_LOCAL=test go test ./backend/app/jobs -run Test_MediaProcessWorkerLocal -count=1`
-`ENV_LOCAL=minio go test ./backend/app/jobs -run Test_MediaProcessWorkerS3 -count=1`
### 18) 支付集成
**需求目标**
@@ -393,6 +397,7 @@
- 性能优化(避免 N+1订单/租户列表批量聚合 + topics 聚合)。
- 多租户强隔离(/t/:tenantCode/v1 + TenantResolver
- 真实存储 Provider 接入(生产 MinIO S3 配置与 E2E 验证)。
- 媒体处理管线适配对象存储S3/MinIO
## 里程碑建议
- M1完成 P0