From 9323fb423098f8c3d32454f82971dac0c518e746 Mon Sep 17 00:00:00 2001 From: yanghao05 Date: Thu, 17 Apr 2025 20:42:57 +0800 Subject: [PATCH] feat: add download from alioss job --- backend/app/jobs/download_from_alioss.go | 49 ++++++++++++++-- backend/app/jobs/download_from_alioss_test.go | 58 +++++++++++++++++++ backend/app/jobs/provider.gen.go | 6 +- backend/app/service/testx/testing.go | 4 ++ backend/providers/ali/config.go | 6 +- backend/providers/ali/oss_client.go | 20 +++++-- 6 files changed, 129 insertions(+), 14 deletions(-) create mode 100644 backend/app/jobs/download_from_alioss_test.go diff --git a/backend/app/jobs/download_from_alioss.go b/backend/app/jobs/download_from_alioss.go index 3b1f8b2..e14bec2 100644 --- a/backend/app/jobs/download_from_alioss.go +++ b/backend/app/jobs/download_from_alioss.go @@ -2,8 +2,12 @@ package jobs import ( "context" + "os" + "path/filepath" "time" + "quyun/providers/ali" + . "github.com/riverqueue/river" log "github.com/sirupsen/logrus" _ "go.ipao.vip/atom" @@ -14,8 +18,7 @@ import ( var _ contracts.JobArgs = (*WechatCallback)(nil) type DownloadFromAliOSS struct { - Bucket string - Path string + Path string } func (s DownloadFromAliOSS) InsertOpts() InsertOpts { @@ -33,6 +36,12 @@ var _ Worker[DownloadFromAliOSS] = (*DownloadFromAliOSSWorker)(nil) // @provider(job) type DownloadFromAliOSSWorker struct { WorkerDefaults[DownloadFromAliOSS] + + oss *ali.OSSClient +} + +func (w *DownloadFromAliOSSWorker) NextRetry(job *Job[DownloadFromAliOSS]) time.Time { + return time.Now().Add(30 * time.Second) } func (w *DownloadFromAliOSSWorker) Work(ctx context.Context, job *Job[DownloadFromAliOSS]) error { @@ -41,9 +50,37 @@ func (w *DownloadFromAliOSSWorker) Work(ctx context.Context, job *Job[DownloadFr log.Infof("[Start] Working on job with strings: %+v", job.Args) defer log.Infof("[End] Finished %s", job.Args.Kind()) + dst := filepath.Join("/Users/rogee/Projects/self/quyun/backend/fixtures/oss/", job.Args.Path) + + // check is path exist + _, err := os.Stat(dst) + if os.IsNotExist(err) { + log.Infof("File not exists: %s", dst) + // midir + err := os.MkdirAll(filepath.Dir(dst), os.ModePerm) + if err != nil { + log.Errorf("Error creating directory: %v", err) + return err + } + } + + if err == nil { + log.Infof("File already exists: %s", dst) + err := os.Remove(dst) + if err != nil { + log.Errorf("Error removing file: %v", err) + return err + } + + return nil + } + + if err := w.oss.Download(ctx, job.Args.Path, dst); err != nil { + log.Errorf("Error downloading file: %v", err) + return err + } + + log.Infof("Successfully downloaded file: %s", job.Args.Path) + return nil } - -func (w *DownloadFromAliOSSWorker) NextRetry(job *Job[DownloadFromAliOSS]) time.Time { - return time.Now().Add(30 * time.Second) -} diff --git a/backend/app/jobs/download_from_alioss_test.go b/backend/app/jobs/download_from_alioss_test.go new file mode 100644 index 0000000..b050df6 --- /dev/null +++ b/backend/app/jobs/download_from_alioss_test.go @@ -0,0 +1,58 @@ +package jobs + +import ( + "context" + "testing" + + "quyun/app/service/testx" + "quyun/providers/ali" + "quyun/providers/job" + + . "github.com/riverqueue/river" + . "github.com/smartystreets/goconvey/convey" + "github.com/stretchr/testify/suite" + _ "go.ipao.vip/atom" + "go.ipao.vip/atom/contracts" + "go.uber.org/dig" +) + +type DownloadFromAliOSSSuiteInjectParams struct { + dig.In + + Initials []contracts.Initial `group:"initials"` // nolint:structcheck + Job *job.Job + Oss *ali.OSSClient +} + +type DownloadFromAliOSSSuite struct { + suite.Suite + + DownloadFromAliOSSSuiteInjectParams +} + +func Test_DownloadFromAliOSS(t *testing.T) { + providers := testx.Default().With(Provide) + + testx.Serve(providers, t, func(p DownloadFromAliOSSSuiteInjectParams) { + suite.Run(t, &DownloadFromAliOSSSuite{DownloadFromAliOSSSuiteInjectParams: p}) + }) +} + +func (t *DownloadFromAliOSSSuite) Test_Work() { + Convey("test_work", t.T(), func() { + Convey("step 1", func() { + job := &Job[DownloadFromAliOSS]{ + Args: DownloadFromAliOSS{ + Path: "quyun/959e5310105c96e653f10b74e5bdc36b.mp4", + }, + } + + worker := &DownloadFromAliOSSWorker{ + oss: t.Oss, + } + + err := worker.Work(context.Background(), job) + So(err, ShouldBeNil) + }) + }) +} diff --git a/backend/app/jobs/provider.gen.go b/backend/app/jobs/provider.gen.go index f31d50a..4982468 100755 --- a/backend/app/jobs/provider.gen.go +++ b/backend/app/jobs/provider.gen.go @@ -1,6 +1,7 @@ package jobs import ( + "quyun/providers/ali" "quyun/providers/job" "github.com/riverqueue/river" @@ -39,8 +40,11 @@ func Provide(opts ...opt.Option) error { } if err := container.Container.Provide(func( __job *job.Job, + oss *ali.OSSClient, ) (contracts.Initial, error) { - obj := &DownloadFromAliOSSWorker{} + obj := &DownloadFromAliOSSWorker{ + oss: oss, + } if err := river.AddWorkerSafely(__job.Workers, obj); err != nil { return nil, err } diff --git a/backend/app/service/testx/testing.go b/backend/app/service/testx/testing.go index 1563257..199de52 100644 --- a/backend/app/service/testx/testing.go +++ b/backend/app/service/testx/testing.go @@ -4,7 +4,9 @@ import ( "os" "testing" + "quyun/providers/ali" "quyun/providers/app" + "quyun/providers/job" "quyun/providers/postgres" "quyun/providers/wechat" @@ -18,6 +20,8 @@ import ( func Default(providers ...container.ProviderContainer) container.Providers { return append(container.Providers{ app.DefaultProvider(), + job.DefaultProvider(), + ali.DefaultProvider(), postgres.DefaultProvider(), wechat.DefaultProvider(), }, providers...) diff --git a/backend/providers/ali/config.go b/backend/providers/ali/config.go index b015e15..5f19875 100644 --- a/backend/providers/ali/config.go +++ b/backend/providers/ali/config.go @@ -40,10 +40,10 @@ func Provide(opts ...opt.Option) error { WithCredentialsProvider(cred). WithRegion(config.Region) - ossClient := oss.NewClient(cfg) return &OSSClient{ - client: ossClient, - config: &config, + client: oss.NewClient(cfg), + internalClient: oss.NewClient(cfg.WithUseInternalEndpoint(true)), + config: &config, }, nil }, o.DiOptions()...) } diff --git a/backend/providers/ali/oss_client.go b/backend/providers/ali/oss_client.go index 2fc0776..c98e942 100644 --- a/backend/providers/ali/oss_client.go +++ b/backend/providers/ali/oss_client.go @@ -2,15 +2,15 @@ package ali import ( "context" - "log" "strings" "github.com/aliyun/alibabacloud-oss-go-sdk-v2/oss" ) type OSSClient struct { - client *oss.Client - config *Config + client *oss.Client + internalClient *oss.Client + config *Config } func (c *OSSClient) GetClient() *oss.Client { @@ -23,6 +23,18 @@ func (c *OSSClient) PreSignUpload(ctx context.Context, path string) (*oss.Presig Key: oss.Ptr("quyun/" + strings.Trim(path, "/")), ContentType: oss.Ptr("multipart/form-data"), } - log.Printf("%+v", request) return c.client.Presign(ctx, request) } + +func (c *OSSClient) Download(ctx context.Context, path, dest string) error { + request := &oss.GetObjectRequest{ + Bucket: oss.Ptr(c.config.Bucket), + Key: oss.Ptr(path), + } + + _, err := c.client.GetObjectToFile(ctx, request, dest) + if err != nil { + return err + } + return nil +}