258 lines
6.6 KiB
Go
258 lines
6.6 KiB
Go
package jobs
|
|
|
|
import (
|
|
"database/sql"
|
|
"io"
|
|
"os"
|
|
"os/exec"
|
|
"path"
|
|
"path/filepath"
|
|
"testing"
|
|
|
|
"quyun/v2/app/commands/testx"
|
|
"quyun/v2/app/jobs/args"
|
|
"quyun/v2/database"
|
|
"quyun/v2/database/fields"
|
|
"quyun/v2/database/models"
|
|
"quyun/v2/pkg/consts"
|
|
"quyun/v2/providers/storage"
|
|
|
|
"github.com/riverqueue/river"
|
|
"github.com/rogeecn/fabfile"
|
|
. "github.com/smartystreets/goconvey/convey"
|
|
"github.com/stretchr/testify/suite"
|
|
"go.ipao.vip/atom/contracts"
|
|
"go.ipao.vip/gen/types"
|
|
"go.uber.org/dig"
|
|
)
|
|
|
|
type MediaProcessWorkerTestSuiteInjectParams struct {
|
|
dig.In
|
|
|
|
DB *sql.DB
|
|
Initials []contracts.Initial `group:"initials"`
|
|
Storage *storage.Storage
|
|
}
|
|
|
|
type MediaProcessWorkerLocalSuite struct {
|
|
suite.Suite
|
|
MediaProcessWorkerTestSuiteInjectParams
|
|
}
|
|
|
|
type MediaProcessWorkerS3Suite struct {
|
|
suite.Suite
|
|
MediaProcessWorkerTestSuiteInjectParams
|
|
}
|
|
|
|
func Test_MediaProcessWorkerLocal(t *testing.T) {
|
|
originEnv := os.Getenv("ENV_LOCAL")
|
|
if err := os.Setenv("ENV_LOCAL", "test"); err != nil {
|
|
t.Fatalf("set ENV_LOCAL failed: %v", err)
|
|
}
|
|
t.Cleanup(func() {
|
|
if originEnv == "" {
|
|
_ = os.Unsetenv("ENV_LOCAL")
|
|
} else {
|
|
_ = os.Setenv("ENV_LOCAL", originEnv)
|
|
}
|
|
})
|
|
|
|
providers := testx.Default()
|
|
testx.Serve(providers, t, func(p MediaProcessWorkerTestSuiteInjectParams) {
|
|
suite.Run(t, &MediaProcessWorkerLocalSuite{MediaProcessWorkerTestSuiteInjectParams: p})
|
|
})
|
|
}
|
|
|
|
func Test_MediaProcessWorkerS3(t *testing.T) {
|
|
originEnv := os.Getenv("ENV_LOCAL")
|
|
if err := os.Setenv("ENV_LOCAL", "minio"); err != nil {
|
|
t.Fatalf("set ENV_LOCAL failed: %v", err)
|
|
}
|
|
t.Cleanup(func() {
|
|
if originEnv == "" {
|
|
_ = os.Unsetenv("ENV_LOCAL")
|
|
} else {
|
|
_ = os.Setenv("ENV_LOCAL", originEnv)
|
|
}
|
|
})
|
|
|
|
providers := testx.Default()
|
|
testx.Serve(providers, t, func(p MediaProcessWorkerTestSuiteInjectParams) {
|
|
suite.Run(t, &MediaProcessWorkerS3Suite{MediaProcessWorkerTestSuiteInjectParams: p})
|
|
})
|
|
}
|
|
|
|
func (s *MediaProcessWorkerLocalSuite) Test_Work_Local() {
|
|
Convey("Work Local", s.T(), func() {
|
|
if _, err := exec.LookPath("ffmpeg"); err != nil {
|
|
s.T().Skip("ffmpeg not installed")
|
|
}
|
|
|
|
ctx := s.T().Context()
|
|
database.Truncate(ctx, s.DB, models.TableNameMediaAsset)
|
|
|
|
tempDir := s.T().TempDir()
|
|
fixturePath := fabfile.MustFind("fixtures/demo.mp4")
|
|
objectKey := path.Join("quyun", "public", "demo.mp4")
|
|
dstPath := filepath.Join(tempDir, filepath.FromSlash(objectKey))
|
|
So(copyFile(fixturePath, dstPath), ShouldBeNil)
|
|
|
|
info, err := os.Stat(dstPath)
|
|
So(err, ShouldBeNil)
|
|
|
|
So(s.Storage, ShouldNotBeNil)
|
|
s.Storage.Config.Type = "local"
|
|
s.Storage.Config.LocalPath = tempDir
|
|
|
|
asset := &models.MediaAsset{
|
|
TenantID: 1,
|
|
UserID: 1,
|
|
Type: consts.MediaAssetTypeVideo,
|
|
Status: consts.MediaAssetStatusUploaded,
|
|
Provider: s.Storage.Provider(),
|
|
Bucket: s.Storage.Bucket(),
|
|
ObjectKey: objectKey,
|
|
Meta: types.NewJSONType(fields.MediaAssetMeta{
|
|
Filename: "demo.mp4",
|
|
Size: info.Size(),
|
|
}),
|
|
}
|
|
So(models.MediaAssetQuery.WithContext(ctx).Create(asset), ShouldBeNil)
|
|
|
|
worker := &MediaProcessWorker{storage: s.Storage}
|
|
err = worker.Work(ctx, &river.Job[args.MediaAssetProcessJob]{
|
|
Args: args.MediaAssetProcessJob{
|
|
TenantID: asset.TenantID,
|
|
AssetID: asset.ID,
|
|
},
|
|
})
|
|
So(err, ShouldBeNil)
|
|
|
|
updated, err := models.MediaAssetQuery.WithContext(ctx).
|
|
Where(models.MediaAssetQuery.ID.Eq(asset.ID)).
|
|
First()
|
|
So(err, ShouldBeNil)
|
|
So(updated.Status, ShouldEqual, consts.MediaAssetStatusReady)
|
|
|
|
tbl, q := models.MediaAssetQuery.QueryContext(ctx)
|
|
cover, err := q.Where(
|
|
tbl.SourceAssetID.Eq(asset.ID),
|
|
tbl.Type.Eq(consts.MediaAssetTypeImage),
|
|
).First()
|
|
So(err, ShouldBeNil)
|
|
So(cover.ObjectKey, ShouldNotBeBlank)
|
|
|
|
coverPath := filepath.Join(tempDir, filepath.FromSlash(cover.ObjectKey))
|
|
_, err = os.Stat(coverPath)
|
|
So(err, ShouldBeNil)
|
|
})
|
|
}
|
|
|
|
func (s *MediaProcessWorkerS3Suite) Test_Work_S3() {
|
|
Convey("Work S3", s.T(), func() {
|
|
if _, err := exec.LookPath("ffmpeg"); err != nil {
|
|
s.T().Skip("ffmpeg not installed")
|
|
}
|
|
|
|
ctx := s.T().Context()
|
|
database.Truncate(ctx, s.DB, models.TableNameMediaAsset)
|
|
|
|
So(s.Storage, ShouldNotBeNil)
|
|
s.Storage.Config.Type = "s3"
|
|
s.Storage.Config.Endpoint = "http://127.0.0.1:9000"
|
|
s.Storage.Config.AccessKey = "minioadmin"
|
|
s.Storage.Config.SecretKey = "minioadmin"
|
|
s.Storage.Config.Region = "us-east-1"
|
|
s.Storage.Config.Bucket = "quyun-assets"
|
|
s.Storage.Config.PathStyle = true
|
|
|
|
fixturePath := fabfile.MustFind("fixtures/demo.mp4")
|
|
objectKey := path.Join("quyun", "public", "demo.mp4")
|
|
err := s.Storage.PutObject(ctx, objectKey, fixturePath, "video/mp4")
|
|
if err != nil {
|
|
s.T().Skipf("minio not available: %v", err)
|
|
}
|
|
s.T().Cleanup(func() {
|
|
_ = s.Storage.Delete(objectKey)
|
|
})
|
|
|
|
info, err := os.Stat(fixturePath)
|
|
So(err, ShouldBeNil)
|
|
|
|
asset := &models.MediaAsset{
|
|
TenantID: 1,
|
|
UserID: 1,
|
|
Type: consts.MediaAssetTypeVideo,
|
|
Status: consts.MediaAssetStatusUploaded,
|
|
Provider: "s3",
|
|
Bucket: s.Storage.Config.Bucket,
|
|
ObjectKey: objectKey,
|
|
Meta: types.NewJSONType(fields.MediaAssetMeta{
|
|
Filename: "demo.mp4",
|
|
Size: info.Size(),
|
|
}),
|
|
}
|
|
So(models.MediaAssetQuery.WithContext(ctx).Create(asset), ShouldBeNil)
|
|
|
|
worker := &MediaProcessWorker{storage: s.Storage}
|
|
err = worker.Work(ctx, &river.Job[args.MediaAssetProcessJob]{
|
|
Args: args.MediaAssetProcessJob{
|
|
TenantID: asset.TenantID,
|
|
AssetID: asset.ID,
|
|
},
|
|
})
|
|
So(err, ShouldBeNil)
|
|
|
|
updated, err := models.MediaAssetQuery.WithContext(ctx).
|
|
Where(models.MediaAssetQuery.ID.Eq(asset.ID)).
|
|
First()
|
|
So(err, ShouldBeNil)
|
|
So(updated.Status, ShouldEqual, consts.MediaAssetStatusReady)
|
|
|
|
tbl, q := models.MediaAssetQuery.QueryContext(ctx)
|
|
cover, err := q.Where(
|
|
tbl.SourceAssetID.Eq(asset.ID),
|
|
tbl.Type.Eq(consts.MediaAssetTypeImage),
|
|
).First()
|
|
So(err, ShouldBeNil)
|
|
So(cover.ObjectKey, ShouldNotBeBlank)
|
|
So(cover.Provider, ShouldEqual, "s3")
|
|
|
|
s.T().Cleanup(func() {
|
|
if cover.ObjectKey != "" {
|
|
_ = s.Storage.Delete(cover.ObjectKey)
|
|
}
|
|
})
|
|
|
|
downloadDir := s.T().TempDir()
|
|
downloadPath := filepath.Join(downloadDir, "cover.jpg")
|
|
So(s.Storage.Download(ctx, cover.ObjectKey, downloadPath), ShouldBeNil)
|
|
|
|
downloadInfo, err := os.Stat(downloadPath)
|
|
So(err, ShouldBeNil)
|
|
So(downloadInfo.Size(), ShouldBeGreaterThan, 0)
|
|
})
|
|
}
|
|
|
|
func copyFile(srcPath, dstPath string) error {
|
|
if err := os.MkdirAll(filepath.Dir(dstPath), 0o755); err != nil {
|
|
return err
|
|
}
|
|
src, err := os.Open(srcPath)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
defer src.Close()
|
|
|
|
dst, err := os.Create(dstPath)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
defer dst.Close()
|
|
|
|
if _, err := io.Copy(dst, src); err != nil {
|
|
return err
|
|
}
|
|
return nil
|
|
}
|