feat: add download from alioss job
This commit is contained in:
@@ -2,8 +2,12 @@ package jobs
|
|||||||
|
|
||||||
import (
|
import (
|
||||||
"context"
|
"context"
|
||||||
|
"os"
|
||||||
|
"path/filepath"
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
|
"quyun/providers/ali"
|
||||||
|
|
||||||
. "github.com/riverqueue/river"
|
. "github.com/riverqueue/river"
|
||||||
log "github.com/sirupsen/logrus"
|
log "github.com/sirupsen/logrus"
|
||||||
_ "go.ipao.vip/atom"
|
_ "go.ipao.vip/atom"
|
||||||
@@ -14,7 +18,6 @@ import (
|
|||||||
var _ contracts.JobArgs = (*WechatCallback)(nil)
|
var _ contracts.JobArgs = (*WechatCallback)(nil)
|
||||||
|
|
||||||
type DownloadFromAliOSS struct {
|
type DownloadFromAliOSS struct {
|
||||||
Bucket string
|
|
||||||
Path string
|
Path string
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -33,6 +36,12 @@ var _ Worker[DownloadFromAliOSS] = (*DownloadFromAliOSSWorker)(nil)
|
|||||||
// @provider(job)
|
// @provider(job)
|
||||||
type DownloadFromAliOSSWorker struct {
|
type DownloadFromAliOSSWorker struct {
|
||||||
WorkerDefaults[DownloadFromAliOSS]
|
WorkerDefaults[DownloadFromAliOSS]
|
||||||
|
|
||||||
|
oss *ali.OSSClient
|
||||||
|
}
|
||||||
|
|
||||||
|
func (w *DownloadFromAliOSSWorker) NextRetry(job *Job[DownloadFromAliOSS]) time.Time {
|
||||||
|
return time.Now().Add(30 * time.Second)
|
||||||
}
|
}
|
||||||
|
|
||||||
func (w *DownloadFromAliOSSWorker) Work(ctx context.Context, job *Job[DownloadFromAliOSS]) error {
|
func (w *DownloadFromAliOSSWorker) Work(ctx context.Context, job *Job[DownloadFromAliOSS]) error {
|
||||||
@@ -41,9 +50,37 @@ func (w *DownloadFromAliOSSWorker) Work(ctx context.Context, job *Job[DownloadFr
|
|||||||
log.Infof("[Start] Working on job with strings: %+v", job.Args)
|
log.Infof("[Start] Working on job with strings: %+v", job.Args)
|
||||||
defer log.Infof("[End] Finished %s", job.Args.Kind())
|
defer log.Infof("[End] Finished %s", job.Args.Kind())
|
||||||
|
|
||||||
|
dst := filepath.Join("/Users/rogee/Projects/self/quyun/backend/fixtures/oss/", job.Args.Path)
|
||||||
|
|
||||||
|
// check is path exist
|
||||||
|
_, err := os.Stat(dst)
|
||||||
|
if os.IsNotExist(err) {
|
||||||
|
log.Infof("File not exists: %s", dst)
|
||||||
|
// midir
|
||||||
|
err := os.MkdirAll(filepath.Dir(dst), os.ModePerm)
|
||||||
|
if err != nil {
|
||||||
|
log.Errorf("Error creating directory: %v", err)
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
if err == nil {
|
||||||
|
log.Infof("File already exists: %s", dst)
|
||||||
|
err := os.Remove(dst)
|
||||||
|
if err != nil {
|
||||||
|
log.Errorf("Error removing file: %v", err)
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
if err := w.oss.Download(ctx, job.Args.Path, dst); err != nil {
|
||||||
|
log.Errorf("Error downloading file: %v", err)
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
log.Infof("Successfully downloaded file: %s", job.Args.Path)
|
||||||
|
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func (w *DownloadFromAliOSSWorker) NextRetry(job *Job[DownloadFromAliOSS]) time.Time {
|
|
||||||
return time.Now().Add(30 * time.Second)
|
|
||||||
}
|
|
||||||
|
|||||||
58
backend/app/jobs/download_from_alioss_test.go
Normal file
58
backend/app/jobs/download_from_alioss_test.go
Normal file
@@ -0,0 +1,58 @@
|
|||||||
|
package jobs
|
||||||
|
|
||||||
|
import (
|
||||||
|
"context"
|
||||||
|
"testing"
|
||||||
|
|
||||||
|
"quyun/app/service/testx"
|
||||||
|
"quyun/providers/ali"
|
||||||
|
"quyun/providers/job"
|
||||||
|
|
||||||
|
. "github.com/riverqueue/river"
|
||||||
|
. "github.com/smartystreets/goconvey/convey"
|
||||||
|
"github.com/stretchr/testify/suite"
|
||||||
|
_ "go.ipao.vip/atom"
|
||||||
|
"go.ipao.vip/atom/contracts"
|
||||||
|
"go.uber.org/dig"
|
||||||
|
)
|
||||||
|
|
||||||
|
type DownloadFromAliOSSSuiteInjectParams struct {
|
||||||
|
dig.In
|
||||||
|
|
||||||
|
Initials []contracts.Initial `group:"initials"` // nolint:structcheck
|
||||||
|
Job *job.Job
|
||||||
|
Oss *ali.OSSClient
|
||||||
|
}
|
||||||
|
|
||||||
|
type DownloadFromAliOSSSuite struct {
|
||||||
|
suite.Suite
|
||||||
|
|
||||||
|
DownloadFromAliOSSSuiteInjectParams
|
||||||
|
}
|
||||||
|
|
||||||
|
func Test_DownloadFromAliOSS(t *testing.T) {
|
||||||
|
providers := testx.Default().With(Provide)
|
||||||
|
|
||||||
|
testx.Serve(providers, t, func(p DownloadFromAliOSSSuiteInjectParams) {
|
||||||
|
suite.Run(t, &DownloadFromAliOSSSuite{DownloadFromAliOSSSuiteInjectParams: p})
|
||||||
|
})
|
||||||
|
}
|
||||||
|
|
||||||
|
func (t *DownloadFromAliOSSSuite) Test_Work() {
|
||||||
|
Convey("test_work", t.T(), func() {
|
||||||
|
Convey("step 1", func() {
|
||||||
|
job := &Job[DownloadFromAliOSS]{
|
||||||
|
Args: DownloadFromAliOSS{
|
||||||
|
Path: "quyun/959e5310105c96e653f10b74e5bdc36b.mp4",
|
||||||
|
},
|
||||||
|
}
|
||||||
|
|
||||||
|
worker := &DownloadFromAliOSSWorker{
|
||||||
|
oss: t.Oss,
|
||||||
|
}
|
||||||
|
|
||||||
|
err := worker.Work(context.Background(), job)
|
||||||
|
So(err, ShouldBeNil)
|
||||||
|
})
|
||||||
|
})
|
||||||
|
}
|
||||||
@@ -1,6 +1,7 @@
|
|||||||
package jobs
|
package jobs
|
||||||
|
|
||||||
import (
|
import (
|
||||||
|
"quyun/providers/ali"
|
||||||
"quyun/providers/job"
|
"quyun/providers/job"
|
||||||
|
|
||||||
"github.com/riverqueue/river"
|
"github.com/riverqueue/river"
|
||||||
@@ -39,8 +40,11 @@ func Provide(opts ...opt.Option) error {
|
|||||||
}
|
}
|
||||||
if err := container.Container.Provide(func(
|
if err := container.Container.Provide(func(
|
||||||
__job *job.Job,
|
__job *job.Job,
|
||||||
|
oss *ali.OSSClient,
|
||||||
) (contracts.Initial, error) {
|
) (contracts.Initial, error) {
|
||||||
obj := &DownloadFromAliOSSWorker{}
|
obj := &DownloadFromAliOSSWorker{
|
||||||
|
oss: oss,
|
||||||
|
}
|
||||||
if err := river.AddWorkerSafely(__job.Workers, obj); err != nil {
|
if err := river.AddWorkerSafely(__job.Workers, obj); err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -4,7 +4,9 @@ import (
|
|||||||
"os"
|
"os"
|
||||||
"testing"
|
"testing"
|
||||||
|
|
||||||
|
"quyun/providers/ali"
|
||||||
"quyun/providers/app"
|
"quyun/providers/app"
|
||||||
|
"quyun/providers/job"
|
||||||
"quyun/providers/postgres"
|
"quyun/providers/postgres"
|
||||||
"quyun/providers/wechat"
|
"quyun/providers/wechat"
|
||||||
|
|
||||||
@@ -18,6 +20,8 @@ import (
|
|||||||
func Default(providers ...container.ProviderContainer) container.Providers {
|
func Default(providers ...container.ProviderContainer) container.Providers {
|
||||||
return append(container.Providers{
|
return append(container.Providers{
|
||||||
app.DefaultProvider(),
|
app.DefaultProvider(),
|
||||||
|
job.DefaultProvider(),
|
||||||
|
ali.DefaultProvider(),
|
||||||
postgres.DefaultProvider(),
|
postgres.DefaultProvider(),
|
||||||
wechat.DefaultProvider(),
|
wechat.DefaultProvider(),
|
||||||
}, providers...)
|
}, providers...)
|
||||||
|
|||||||
@@ -40,9 +40,9 @@ func Provide(opts ...opt.Option) error {
|
|||||||
WithCredentialsProvider(cred).
|
WithCredentialsProvider(cred).
|
||||||
WithRegion(config.Region)
|
WithRegion(config.Region)
|
||||||
|
|
||||||
ossClient := oss.NewClient(cfg)
|
|
||||||
return &OSSClient{
|
return &OSSClient{
|
||||||
client: ossClient,
|
client: oss.NewClient(cfg),
|
||||||
|
internalClient: oss.NewClient(cfg.WithUseInternalEndpoint(true)),
|
||||||
config: &config,
|
config: &config,
|
||||||
}, nil
|
}, nil
|
||||||
}, o.DiOptions()...)
|
}, o.DiOptions()...)
|
||||||
|
|||||||
@@ -2,7 +2,6 @@ package ali
|
|||||||
|
|
||||||
import (
|
import (
|
||||||
"context"
|
"context"
|
||||||
"log"
|
|
||||||
"strings"
|
"strings"
|
||||||
|
|
||||||
"github.com/aliyun/alibabacloud-oss-go-sdk-v2/oss"
|
"github.com/aliyun/alibabacloud-oss-go-sdk-v2/oss"
|
||||||
@@ -10,6 +9,7 @@ import (
|
|||||||
|
|
||||||
type OSSClient struct {
|
type OSSClient struct {
|
||||||
client *oss.Client
|
client *oss.Client
|
||||||
|
internalClient *oss.Client
|
||||||
config *Config
|
config *Config
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -23,6 +23,18 @@ func (c *OSSClient) PreSignUpload(ctx context.Context, path string) (*oss.Presig
|
|||||||
Key: oss.Ptr("quyun/" + strings.Trim(path, "/")),
|
Key: oss.Ptr("quyun/" + strings.Trim(path, "/")),
|
||||||
ContentType: oss.Ptr("multipart/form-data"),
|
ContentType: oss.Ptr("multipart/form-data"),
|
||||||
}
|
}
|
||||||
log.Printf("%+v", request)
|
|
||||||
return c.client.Presign(ctx, request)
|
return c.client.Presign(ctx, request)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (c *OSSClient) Download(ctx context.Context, path, dest string) error {
|
||||||
|
request := &oss.GetObjectRequest{
|
||||||
|
Bucket: oss.Ptr(c.config.Bucket),
|
||||||
|
Key: oss.Ptr(path),
|
||||||
|
}
|
||||||
|
|
||||||
|
_, err := c.client.GetObjectToFile(ctx, request, dest)
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|||||||
Reference in New Issue
Block a user