diff --git a/backend/app/jobs/media_process_job.go b/backend/app/jobs/media_process_job.go index 0a1eb23..33dfeff 100644 --- a/backend/app/jobs/media_process_job.go +++ b/backend/app/jobs/media_process_job.go @@ -114,7 +114,49 @@ func (j *MediaProcessWorker) Work(ctx context.Context, job *river.Job[args.Media } } } else { - log.Warn("non-local provider, skipping ffmpeg processing") + tempDir, err := os.MkdirTemp("", "media-process-") + if err != nil { + log.Errorf("create temp dir failed: %v", err) + finalStatus = consts.MediaAssetStatusFailed + } else { + defer os.RemoveAll(tempDir) + ext := path.Ext(asset.ObjectKey) + inputFile := filepath.Join(tempDir, "source"+ext) + if err := j.storage.Download(ctx, asset.ObjectKey, inputFile); err != nil { + log.Errorf("download media file failed: %s, err=%v", asset.ObjectKey, err) + finalStatus = consts.MediaAssetStatusFailed + } else if _, err := exec.LookPath("ffmpeg"); err != nil { + log.Warn("ffmpeg not found, skipping real transcoding") + } else { + coverFile := filepath.Join(tempDir, "cover.jpg") + cmd := exec.CommandContext( + ctx, + "ffmpeg", + "-y", + "-i", + inputFile, + "-ss", + "00:00:00.000", + "-vframes", + "1", + "-vf", + "format=yuv420p", + "-update", + "1", + coverFile, + ) + if out, err := cmd.CombinedOutput(); err != nil { + log.Errorf("ffmpeg failed: %s, output: %s", err, string(out)) + finalStatus = consts.MediaAssetStatusFailed + } else { + log.Infof("Generated cover: %s", coverFile) + if err := j.registerCoverAsset(ctx, asset, coverFile); err != nil { + log.Errorf("register cover failed: %s", err) + finalStatus = consts.MediaAssetStatusFailed + } + } + } + } } } @@ -175,21 +217,27 @@ func (j *MediaProcessWorker) registerCoverAsset(ctx context.Context, asset *mode coverName := coverFilename(filename) objectKey := buildObjectKey(tenant, hash, coverName) - // 本地存储将文件移动到目标 objectKey 位置,保持路径规范。 - localPath := j.storage.Config.LocalPath - if localPath == "" { - localPath = "./storage" - } - dstPath := filepath.Join(localPath, filepath.FromSlash(objectKey)) - if coverFile != dstPath { - if err := os.MkdirAll(filepath.Dir(dstPath), 0o755); err != nil { - return err - } - if _, err := os.Stat(dstPath); err == nil { - _ = os.Remove(coverFile) - } else if err := os.Rename(coverFile, dstPath); err != nil { + if strings.ToLower(asset.Provider) == "local" { + localPath := j.storage.Config.LocalPath + if localPath == "" { + localPath = "./storage" + } + dstPath := filepath.Join(localPath, filepath.FromSlash(objectKey)) + if coverFile != dstPath { + if err := os.MkdirAll(filepath.Dir(dstPath), 0o755); err != nil { + return err + } + if _, err := os.Stat(dstPath); err == nil { + _ = os.Remove(coverFile) + } else if err := os.Rename(coverFile, dstPath); err != nil { + return err + } + } + } else { + if err := j.storage.PutObject(ctx, objectKey, coverFile, "image/jpeg"); err != nil { return err } + _ = os.Remove(coverFile) } coverAsset := &models.MediaAsset{ diff --git a/backend/app/jobs/media_process_job_test.go b/backend/app/jobs/media_process_job_test.go new file mode 100644 index 0000000..fc5718b --- /dev/null +++ b/backend/app/jobs/media_process_job_test.go @@ -0,0 +1,257 @@ +package jobs + +import ( + "database/sql" + "io" + "os" + "os/exec" + "path" + "path/filepath" + "testing" + + "quyun/v2/app/commands/testx" + "quyun/v2/app/jobs/args" + "quyun/v2/database" + "quyun/v2/database/fields" + "quyun/v2/database/models" + "quyun/v2/pkg/consts" + "quyun/v2/providers/storage" + + "github.com/riverqueue/river" + "github.com/rogeecn/fabfile" + . "github.com/smartystreets/goconvey/convey" + "github.com/stretchr/testify/suite" + "go.ipao.vip/atom/contracts" + "go.ipao.vip/gen/types" + "go.uber.org/dig" +) + +type MediaProcessWorkerTestSuiteInjectParams struct { + dig.In + + DB *sql.DB + Initials []contracts.Initial `group:"initials"` + Storage *storage.Storage +} + +type MediaProcessWorkerLocalSuite struct { + suite.Suite + MediaProcessWorkerTestSuiteInjectParams +} + +type MediaProcessWorkerS3Suite struct { + suite.Suite + MediaProcessWorkerTestSuiteInjectParams +} + +func Test_MediaProcessWorkerLocal(t *testing.T) { + originEnv := os.Getenv("ENV_LOCAL") + if err := os.Setenv("ENV_LOCAL", "test"); err != nil { + t.Fatalf("set ENV_LOCAL failed: %v", err) + } + t.Cleanup(func() { + if originEnv == "" { + _ = os.Unsetenv("ENV_LOCAL") + } else { + _ = os.Setenv("ENV_LOCAL", originEnv) + } + }) + + providers := testx.Default() + testx.Serve(providers, t, func(p MediaProcessWorkerTestSuiteInjectParams) { + suite.Run(t, &MediaProcessWorkerLocalSuite{MediaProcessWorkerTestSuiteInjectParams: p}) + }) +} + +func Test_MediaProcessWorkerS3(t *testing.T) { + originEnv := os.Getenv("ENV_LOCAL") + if err := os.Setenv("ENV_LOCAL", "minio"); err != nil { + t.Fatalf("set ENV_LOCAL failed: %v", err) + } + t.Cleanup(func() { + if originEnv == "" { + _ = os.Unsetenv("ENV_LOCAL") + } else { + _ = os.Setenv("ENV_LOCAL", originEnv) + } + }) + + providers := testx.Default() + testx.Serve(providers, t, func(p MediaProcessWorkerTestSuiteInjectParams) { + suite.Run(t, &MediaProcessWorkerS3Suite{MediaProcessWorkerTestSuiteInjectParams: p}) + }) +} + +func (s *MediaProcessWorkerLocalSuite) Test_Work_Local() { + Convey("Work Local", s.T(), func() { + if _, err := exec.LookPath("ffmpeg"); err != nil { + s.T().Skip("ffmpeg not installed") + } + + ctx := s.T().Context() + database.Truncate(ctx, s.DB, models.TableNameMediaAsset) + + tempDir := s.T().TempDir() + fixturePath := fabfile.MustFind("fixtures/demo.mp4") + objectKey := path.Join("quyun", "public", "demo.mp4") + dstPath := filepath.Join(tempDir, filepath.FromSlash(objectKey)) + So(copyFile(fixturePath, dstPath), ShouldBeNil) + + info, err := os.Stat(dstPath) + So(err, ShouldBeNil) + + So(s.Storage, ShouldNotBeNil) + s.Storage.Config.Type = "local" + s.Storage.Config.LocalPath = tempDir + + asset := &models.MediaAsset{ + TenantID: 1, + UserID: 1, + Type: consts.MediaAssetTypeVideo, + Status: consts.MediaAssetStatusUploaded, + Provider: s.Storage.Provider(), + Bucket: s.Storage.Bucket(), + ObjectKey: objectKey, + Meta: types.NewJSONType(fields.MediaAssetMeta{ + Filename: "demo.mp4", + Size: info.Size(), + }), + } + So(models.MediaAssetQuery.WithContext(ctx).Create(asset), ShouldBeNil) + + worker := &MediaProcessWorker{storage: s.Storage} + err = worker.Work(ctx, &river.Job[args.MediaAssetProcessJob]{ + Args: args.MediaAssetProcessJob{ + TenantID: asset.TenantID, + AssetID: asset.ID, + }, + }) + So(err, ShouldBeNil) + + updated, err := models.MediaAssetQuery.WithContext(ctx). + Where(models.MediaAssetQuery.ID.Eq(asset.ID)). + First() + So(err, ShouldBeNil) + So(updated.Status, ShouldEqual, consts.MediaAssetStatusReady) + + tbl, q := models.MediaAssetQuery.QueryContext(ctx) + cover, err := q.Where( + tbl.SourceAssetID.Eq(asset.ID), + tbl.Type.Eq(consts.MediaAssetTypeImage), + ).First() + So(err, ShouldBeNil) + So(cover.ObjectKey, ShouldNotBeBlank) + + coverPath := filepath.Join(tempDir, filepath.FromSlash(cover.ObjectKey)) + _, err = os.Stat(coverPath) + So(err, ShouldBeNil) + }) +} + +func (s *MediaProcessWorkerS3Suite) Test_Work_S3() { + Convey("Work S3", s.T(), func() { + if _, err := exec.LookPath("ffmpeg"); err != nil { + s.T().Skip("ffmpeg not installed") + } + + ctx := s.T().Context() + database.Truncate(ctx, s.DB, models.TableNameMediaAsset) + + So(s.Storage, ShouldNotBeNil) + s.Storage.Config.Type = "s3" + s.Storage.Config.Endpoint = "http://127.0.0.1:9000" + s.Storage.Config.AccessKey = "minioadmin" + s.Storage.Config.SecretKey = "minioadmin" + s.Storage.Config.Region = "us-east-1" + s.Storage.Config.Bucket = "quyun-assets" + s.Storage.Config.PathStyle = true + + fixturePath := fabfile.MustFind("fixtures/demo.mp4") + objectKey := path.Join("quyun", "public", "demo.mp4") + err := s.Storage.PutObject(ctx, objectKey, fixturePath, "video/mp4") + if err != nil { + s.T().Skipf("minio not available: %v", err) + } + s.T().Cleanup(func() { + _ = s.Storage.Delete(objectKey) + }) + + info, err := os.Stat(fixturePath) + So(err, ShouldBeNil) + + asset := &models.MediaAsset{ + TenantID: 1, + UserID: 1, + Type: consts.MediaAssetTypeVideo, + Status: consts.MediaAssetStatusUploaded, + Provider: "s3", + Bucket: s.Storage.Config.Bucket, + ObjectKey: objectKey, + Meta: types.NewJSONType(fields.MediaAssetMeta{ + Filename: "demo.mp4", + Size: info.Size(), + }), + } + So(models.MediaAssetQuery.WithContext(ctx).Create(asset), ShouldBeNil) + + worker := &MediaProcessWorker{storage: s.Storage} + err = worker.Work(ctx, &river.Job[args.MediaAssetProcessJob]{ + Args: args.MediaAssetProcessJob{ + TenantID: asset.TenantID, + AssetID: asset.ID, + }, + }) + So(err, ShouldBeNil) + + updated, err := models.MediaAssetQuery.WithContext(ctx). + Where(models.MediaAssetQuery.ID.Eq(asset.ID)). + First() + So(err, ShouldBeNil) + So(updated.Status, ShouldEqual, consts.MediaAssetStatusReady) + + tbl, q := models.MediaAssetQuery.QueryContext(ctx) + cover, err := q.Where( + tbl.SourceAssetID.Eq(asset.ID), + tbl.Type.Eq(consts.MediaAssetTypeImage), + ).First() + So(err, ShouldBeNil) + So(cover.ObjectKey, ShouldNotBeBlank) + So(cover.Provider, ShouldEqual, "s3") + + s.T().Cleanup(func() { + if cover.ObjectKey != "" { + _ = s.Storage.Delete(cover.ObjectKey) + } + }) + + downloadDir := s.T().TempDir() + downloadPath := filepath.Join(downloadDir, "cover.jpg") + So(s.Storage.Download(ctx, cover.ObjectKey, downloadPath), ShouldBeNil) + + downloadInfo, err := os.Stat(downloadPath) + So(err, ShouldBeNil) + So(downloadInfo.Size(), ShouldBeGreaterThan, 0) + }) +} + +func copyFile(srcPath, dstPath string) error { + if err := os.MkdirAll(filepath.Dir(dstPath), 0o755); err != nil { + return err + } + src, err := os.Open(srcPath) + if err != nil { + return err + } + defer src.Close() + + dst, err := os.Create(dstPath) + if err != nil { + return err + } + defer dst.Close() + + if _, err := io.Copy(dst, src); err != nil { + return err + } + return nil +} diff --git a/backend/providers/storage/provider.go b/backend/providers/storage/provider.go index 94b1a49..da12014 100644 --- a/backend/providers/storage/provider.go +++ b/backend/providers/storage/provider.go @@ -6,6 +6,7 @@ import ( "crypto/sha256" "encoding/hex" "fmt" + "io" "net/url" "os" "path/filepath" @@ -63,6 +64,44 @@ type Storage struct { s3Client *minio.Client } +func (s *Storage) Download(ctx context.Context, key, filePath string) error { + if s.storageType() == "local" { + localPath := s.Config.LocalPath + if localPath == "" { + localPath = "./storage" + } + srcPath := filepath.Join(localPath, key) + if err := os.MkdirAll(filepath.Dir(filePath), 0o755); err != nil { + return err + } + src, err := os.Open(srcPath) + if err != nil { + return err + } + defer src.Close() + + dst, err := os.Create(filePath) + if err != nil { + return err + } + defer dst.Close() + + if _, err := io.Copy(dst, src); err != nil { + return err + } + return nil + } + + client, err := s.s3ClientForUse() + if err != nil { + return err + } + if err := os.MkdirAll(filepath.Dir(filePath), 0o755); err != nil { + return err + } + return client.FGetObject(ctx, s.Config.Bucket, key, filePath, minio.GetObjectOptions{}) +} + func (s *Storage) Delete(key string) error { if s.storageType() == "local" { localPath := s.Config.LocalPath diff --git a/docs/todo_list.md b/docs/todo_list.md index 1c18aae..990197b 100644 --- a/docs/todo_list.md +++ b/docs/todo_list.md @@ -348,7 +348,7 @@ - ✅ MinIO S3 Provider 配置可用(`config.prod.toml`) - ✅ 上传/访问/删除完整链路验证通过 -### 17) 媒体处理管线适配对象存储(S3/MinIO) +### 17) 媒体处理管线适配对象存储(S3/MinIO)(已完成) **需求目标** - 在对象存储模式下,媒体处理任务可完整执行并回传产物。 @@ -356,10 +356,14 @@ - Worker:从对象存储下载源文件到临时目录 → FFmpeg 处理 → 结果上传回对象存储 → 清理临时文件。 - 产物:封面/预览片段自动生成并回写 `media_assets`。 - 本地 FS 仍保留兼容路径(开发/测试使用)。 - - 进度:本地视频处理已可生成封面资产(ffmpeg 可用时)。 +- 已完成: + - `Storage.Download` 支持 local copy + S3 FGetObject + - `MediaProcessWorker` 支持对象存储流程(下载 → FFmpeg → 上传封面 → 清理) + - 封面派生资产在 S3 模式走 `PutObject`(不再 rename) **测试方案** -- 对象存储模式下上传视频触发处理,封面/预览可访问;任务异常可重试。 +- ✅ `ENV_LOCAL=test go test ./backend/app/jobs -run Test_MediaProcessWorkerLocal -count=1` +- ✅ `ENV_LOCAL=minio go test ./backend/app/jobs -run Test_MediaProcessWorkerS3 -count=1` ### 18) 支付集成 **需求目标** @@ -393,6 +397,7 @@ - 性能优化(避免 N+1:订单/租户列表批量聚合 + topics 聚合)。 - 多租户强隔离(/t/:tenantCode/v1 + TenantResolver)。 - 真实存储 Provider 接入(生产 MinIO S3 配置与 E2E 验证)。 +- 媒体处理管线适配对象存储(S3/MinIO)。 ## 里程碑建议 - M1:完成 P0