From 1ca75b60601b435d5fc3ac1e4ab902fe3152e8e7 Mon Sep 17 00:00:00 2001 From: yanghao05 Date: Thu, 17 Apr 2025 19:56:35 +0800 Subject: [PATCH] feat: update ali oss client --- backend/app/http/admin/provider.gen.go | 4 +- backend/app/http/admin/routes.gen.go | 21 +--- backend/app/http/admin/uploads.go | 146 +++-------------------- backend/app/jobs/download_from_alioss.go | 49 ++++++++ backend/app/jobs/provider.gen.go | 12 ++ backend/go.mod | 2 + backend/go.sum | 2 + backend/providers/ali/config.go | 33 ++--- backend/providers/ali/credential.go | 123 ------------------- backend/providers/ali/oss_client.go | 27 +++++ backend/test.http | 9 +- 11 files changed, 133 insertions(+), 295 deletions(-) create mode 100644 backend/app/jobs/download_from_alioss.go delete mode 100644 backend/providers/ali/credential.go create mode 100644 backend/providers/ali/oss_client.go diff --git a/backend/app/http/admin/provider.gen.go b/backend/app/http/admin/provider.gen.go index 045a7ad..61e8afd 100755 --- a/backend/app/http/admin/provider.gen.go +++ b/backend/app/http/admin/provider.gen.go @@ -69,12 +69,12 @@ func Provide(opts ...opt.Option) error { return err } if err := container.Container.Provide(func( - ali *ali.Config, app *app.Config, + oss *ali.OSSClient, ) (*uploads, error) { obj := &uploads{ - ali: ali, app: app, + oss: oss, } return obj, nil diff --git a/backend/app/http/admin/routes.gen.go b/backend/app/http/admin/routes.gen.go index 18e0795..cae2dc4 100644 --- a/backend/app/http/admin/routes.gen.go +++ b/backend/app/http/admin/routes.gen.go @@ -8,7 +8,6 @@ import ( _ "go.ipao.vip/atom" _ "go.ipao.vip/atom/contracts" . "go.ipao.vip/atom/fen" - "mime/multipart" "quyun/app/requests" ) @@ -82,26 +81,10 @@ func (r *Routes) Register(router fiber.Router) { )) // 注册路由组: uploads - router.Post("/v1/admin/uploads/:md5/chunks/:idx", Func3( - r.uploads.Chunks, - PathParam[string]("md5"), - PathParam[string]("idx"), - File[multipart.FileHeader]("file"), - )) - - router.Post("/v1/admin/uploads/:md5/complete", Func2( - r.uploads.Complete, - PathParam[string]("md5"), - Body[UploadFileInfo]("body"), - )) - - router.Get("/v1/admin/uploads/token", DataFunc0( - r.uploads.Token, - )) - - router.Get("/v1/admin/uploads/pre-uploaded-check/:md5", Func1( + router.Get("/v1/admin/uploads/pre-uploaded-check/:md5.:ext", DataFunc2( r.uploads.PreUploadCheck, PathParam[string]("md5"), + PathParam[string]("ext"), )) router.Post("/v1/admin/uploads/post-uploaded-action", Func1( diff --git a/backend/app/http/admin/uploads.go b/backend/app/http/admin/uploads.go index 1850642..09a1858 100644 --- a/backend/app/http/admin/uploads.go +++ b/backend/app/http/admin/uploads.go @@ -3,20 +3,16 @@ package admin import ( "errors" "fmt" - "mime/multipart" - "os" "path/filepath" - "time" "quyun/app/models" "quyun/database/schemas/public/model" - "quyun/pkg/utils" "quyun/providers/ali" "quyun/providers/app" + "github.com/aliyun/alibabacloud-oss-go-sdk-v2/oss" "github.com/go-jet/jet/v2/qrm" "github.com/gofiber/fiber/v3" - log "github.com/sirupsen/logrus" ) const UPLOAD_PATH = "quyun" @@ -24,140 +20,30 @@ const UPLOAD_PATH = "quyun" // @provider type uploads struct { app *app.Config - ali *ali.Config + oss *ali.OSSClient } -func (up *uploads) storagePath() string { - return filepath.Join(up.app.StoragePath, "uploads") -} - -type UploadChunk struct { - Chunk int `query:"chunk"` - Md5 string `query:"md5"` -} - -type UploadFileInfo struct { - Md5 string `json:"md5"` - Filename string `json:"filename"` - Mime string `json:"mime"` - Chunks int `json:"chunks"` -} - -// Upload chunks -// @Router /v1/admin/uploads/:md5/chunks/:idx [post] -// @Bind md5 path -// @Bind idx path -// @Bind file file -func (up *uploads) Chunks(ctx fiber.Ctx, md5, idx string, file *multipart.FileHeader) error { - tmpPath := filepath.Join(up.storagePath(), md5, idx) - - // if tmpPath not exists, create it - if _, err := os.Stat(tmpPath); os.IsNotExist(err) { - if err := os.MkdirAll(filepath.Dir(tmpPath), os.ModePerm); err != nil { - log.WithError(err).Errorf("create tmpPath failed %s", tmpPath) - return err - } - } - - // save file to tmpPath - if err := ctx.SaveFile(file, tmpPath); err != nil { - log.WithError(err).Errorf("save file to tmpPath failed %s", tmpPath) - return err - } - return nil -} - -// Complete uploads -// @Router /v1/admin/uploads/:md5/complete [post] -// @Bind md5 path -// @Bind body body -func (up *uploads) Complete(ctx fiber.Ctx, md5 string, body *UploadFileInfo) error { - // merge chunks - path := filepath.Join(up.storagePath(), md5) - defer os.RemoveAll(path) - - targetFile := filepath.Join(up.storagePath(), md5, body.Filename) - - // if targetFile not exists, create it - tf, err := os.Create(targetFile) - if err != nil { - return err - } - - for i := 0; i < body.Chunks; i++ { - tmpPath := filepath.Join(up.storagePath(), md5, fmt.Sprintf("%d", i)) - - // open chunk file - chunkFile, err := os.Open(tmpPath) - if err != nil { - tf.Close() - return err - } - - // copy chunk file to target file - if _, err := tf.ReadFrom(chunkFile); err != nil { - chunkFile.Close() - tf.Close() - return err - } - - chunkFile.Close() - } - tf.Close() - - // validate md5 - ok, err := utils.CompareFileMd5(targetFile, md5) - if err != nil { - return err - } - if !ok { - return errors.New("md5 not match") - } - - // save file to target path - targetPath := filepath.Join(up.storagePath(), md5+filepath.Ext(body.Filename)) - if err := os.Rename(targetFile, targetPath); err != nil { - return err - } - - fState, err := os.Stat(targetPath) - if err != nil { - return err - } - - model := &model.Medias{ - CreatedAt: time.Now(), - Name: body.Filename, - MimeType: body.Mime, - Size: fState.Size(), // Updated to use fState.Size() - Path: targetPath, - } - - // save to db - if err := models.Medias.Create(ctx.Context(), model); err != nil { - return err - } - - log.Infof("File %s uploaded successfully", body.Filename) - - return nil -} - -// Token -// @Router /v1/admin/uploads/token [get] -func (up *uploads) Token(ctx fiber.Ctx) (*ali.PolicyToken, error) { - return up.ali.GetToken(UPLOAD_PATH) +type PreCheckResp struct { + Exists bool `json:"exists"` + PreSign *oss.PresignResult `json:"pre_sign"` } // PreUploadCheck -// @Router /v1/admin/uploads/pre-uploaded-check/:md5 [get] +// @Router /v1/admin/uploads/pre-uploaded-check/:md5.:ext [get] // @Bind md5 path -func (up *uploads) PreUploadCheck(ctx fiber.Ctx, md5 string) error { +// @Bind ext path +func (up *uploads) PreUploadCheck(ctx fiber.Ctx, md5, ext string) (*PreCheckResp, error) { _, err := models.Medias.GetByHash(ctx.Context(), md5) if err != nil && errors.Is(err, qrm.ErrNoRows) { - return ctx.SendString("ok") + preSign, err := up.oss.PreSignUpload(ctx.Context(), fmt.Sprintf("%s%s", md5, ext)) + if err != nil { + return nil, err + } + + return &PreCheckResp{Exists: false, PreSign: preSign}, nil } - return ctx.SendString("exists") + + return &PreCheckResp{Exists: true}, nil } type PostUploadedForm struct { diff --git a/backend/app/jobs/download_from_alioss.go b/backend/app/jobs/download_from_alioss.go new file mode 100644 index 0000000..3b1f8b2 --- /dev/null +++ b/backend/app/jobs/download_from_alioss.go @@ -0,0 +1,49 @@ +package jobs + +import ( + "context" + "time" + + . "github.com/riverqueue/river" + log "github.com/sirupsen/logrus" + _ "go.ipao.vip/atom" + "go.ipao.vip/atom/contracts" + _ "go.ipao.vip/atom/contracts" +) + +var _ contracts.JobArgs = (*WechatCallback)(nil) + +type DownloadFromAliOSS struct { + Bucket string + Path string +} + +func (s DownloadFromAliOSS) InsertOpts() InsertOpts { + return InsertOpts{ + Queue: QueueDefault, + Priority: PriorityDefault, + } +} + +func (s DownloadFromAliOSS) Kind() string { return "download_from_ali_oss" } +func (a DownloadFromAliOSS) UniqueID() string { return a.Kind() } + +var _ Worker[DownloadFromAliOSS] = (*DownloadFromAliOSSWorker)(nil) + +// @provider(job) +type DownloadFromAliOSSWorker struct { + WorkerDefaults[DownloadFromAliOSS] +} + +func (w *DownloadFromAliOSSWorker) Work(ctx context.Context, job *Job[DownloadFromAliOSS]) error { + log := log.WithField("job", job.Args.Kind()) + + log.Infof("[Start] Working on job with strings: %+v", job.Args) + defer log.Infof("[End] Finished %s", job.Args.Kind()) + + return nil +} + +func (w *DownloadFromAliOSSWorker) NextRetry(job *Job[DownloadFromAliOSS]) time.Time { + return time.Now().Add(30 * time.Second) +} diff --git a/backend/app/jobs/provider.gen.go b/backend/app/jobs/provider.gen.go index 77ba79f..f31d50a 100755 --- a/backend/app/jobs/provider.gen.go +++ b/backend/app/jobs/provider.gen.go @@ -37,6 +37,18 @@ func Provide(opts ...opt.Option) error { }, atom.GroupInitial); err != nil { return err } + if err := container.Container.Provide(func( + __job *job.Job, + ) (contracts.Initial, error) { + obj := &DownloadFromAliOSSWorker{} + if err := river.AddWorkerSafely(__job.Workers, obj); err != nil { + return nil, err + } + + return obj, nil + }, atom.GroupInitial); err != nil { + return err + } if err := container.Container.Provide(func( __job *job.Job, ) (contracts.Initial, error) { diff --git a/backend/go.mod b/backend/go.mod index a2a39e8..543af3c 100644 --- a/backend/go.mod +++ b/backend/go.mod @@ -66,6 +66,7 @@ require ( github.com/Rican7/retry v0.3.1 // indirect github.com/alibabacloud-go/debug v1.0.1 // indirect github.com/alibabacloud-go/tea v1.2.2 // indirect + github.com/aliyun/alibabacloud-oss-go-sdk-v2 v1.2.1 // indirect github.com/andybalholm/brotli v1.1.1 // indirect github.com/cenkalti/backoff/v4 v4.3.0 // indirect github.com/cespare/xxhash/v2 v2.3.0 // indirect @@ -164,6 +165,7 @@ require ( golang.org/x/mod v0.22.0 // indirect golang.org/x/sys v0.31.0 // indirect golang.org/x/text v0.23.0 // indirect + golang.org/x/time v0.5.0 // indirect golang.org/x/tools v0.29.0 // indirect google.golang.org/appengine v1.6.8 // indirect google.golang.org/genproto v0.0.0-20230410155749-daa745c078e1 // indirect diff --git a/backend/go.sum b/backend/go.sum index 020f7da..cd918a5 100644 --- a/backend/go.sum +++ b/backend/go.sum @@ -24,6 +24,8 @@ github.com/alibabacloud-go/debug v1.0.1 h1:MsW9SmUtbb1Fnt3ieC6NNZi6aEwrXfDksD4QA github.com/alibabacloud-go/debug v1.0.1/go.mod h1:8gfgZCCAC3+SCzjWtY053FrOcd4/qlH6IHTI4QyICOc= github.com/alibabacloud-go/tea v1.2.2 h1:aTsR6Rl3ANWPfqeQugPglfurloyBJY85eFy7Gc1+8oU= github.com/alibabacloud-go/tea v1.2.2/go.mod h1:CF3vOzEMAG+bR4WOql8gc2G9H3EkH3ZLAQdpmpXMgwk= +github.com/aliyun/alibabacloud-oss-go-sdk-v2 v1.2.1 h1:sOhpJdR/+lbQniznp3cYSfwQlXbVkT0ccuiZScBrI6Y= +github.com/aliyun/alibabacloud-oss-go-sdk-v2 v1.2.1/go.mod h1:FTzydeQVmR24FI0D6XWUOMKckjXehM/jgMn1xC+DA9M= github.com/aliyun/credentials-go v1.4.5 h1:O76WYKgdy1oQYYiJkERjlA2dxGuvLRrzuO2ScrtGWSk= github.com/aliyun/credentials-go v1.4.5/go.mod h1:Jm6d+xIgwJVLVWT561vy67ZRP4lPTQxMbEYRuT2Ti1U= github.com/andybalholm/brotli v1.1.1 h1:PR2pgnyFznKEugtsUo0xLdDop5SKXd5Qf5ysW+7XdTA= diff --git a/backend/providers/ali/config.go b/backend/providers/ali/config.go index ee0d69b..b015e15 100644 --- a/backend/providers/ali/config.go +++ b/backend/providers/ali/config.go @@ -1,6 +1,8 @@ package ali import ( + "github.com/aliyun/alibabacloud-oss-go-sdk-v2/oss" + "github.com/aliyun/alibabacloud-oss-go-sdk-v2/oss/credentials" "go.ipao.vip/atom/container" "go.ipao.vip/atom/opt" ) @@ -32,25 +34,16 @@ func Provide(opts ...opt.Option) error { return err } - return container.Container.Provide(func() (*Config, error) { - return &config, nil + return container.Container.Provide(func() (*OSSClient, error) { + cred := credentials.NewStaticCredentialsProvider(config.AccessKeyId, config.AccessKeySecret) + cfg := oss.LoadDefaultConfig(). + WithCredentialsProvider(cred). + WithRegion(config.Region) + + ossClient := oss.NewClient(cfg) + return &OSSClient{ + client: ossClient, + config: &config, + }, nil }, o.DiOptions()...) } - -type PolicyToken struct { - Policy string `json:"policy"` - SecurityToken string `json:"security_token"` - SignatureVersion string `json:"x_oss_signature_version"` - Credential string `json:"x_oss_credential"` - Date string `json:"x_oss_date"` - Signature string `json:"signature"` - Host string `json:"host"` - Dir string `json:"dir"` - Callback string `json:"callback"` -} - -type CallbackParam struct { - CallbackUrl string `json:"callbackUrl"` - CallbackBody string `json:"callbackBody"` - CallbackBodyType string `json:"callbackBodyType"` -} diff --git a/backend/providers/ali/credential.go b/backend/providers/ali/credential.go deleted file mode 100644 index d91414a..0000000 --- a/backend/providers/ali/credential.go +++ /dev/null @@ -1,123 +0,0 @@ -package ali - -import ( - "crypto/hmac" - "crypto/sha256" - "encoding/base64" - "encoding/hex" - "encoding/json" - "fmt" - "hash" - "io" - "strings" - "time" - - "github.com/aliyun/credentials-go/credentials" - "github.com/pkg/errors" -) - -func (c *Config) GetToken(path string) (*PolicyToken, error) { - product := "oss" - - host := fmt.Sprintf("https://%s.oss-%s.aliyuncs.com", c.Bucket, c.Region) - if c.Host != nil { - host = *c.Host - } - - // 设置上传目录 - dir := strings.TrimRight(path, "/") + "/" - - // callbackUrl为 上传回调服务器的URL,请将下面的IP和Port配置为您自己的真实信息。 - - config := new(credentials.Config). - SetType("access_key"). - SetAccessKeyId(c.AccessKeyId). - SetAccessKeySecret(c.AccessKeySecret). - SetPolicy(""). - SetRoleSessionExpiration(3600) - // SetType("ram_role_arn"). - // SetRoleArn(os.Getenv("OSS_STS_ROLE_ARN")). - // SetRoleSessionName("Role_Session_Name"). - - // 根据配置创建凭证提供器 - provider, err := credentials.NewCredential(config) - if err != nil { - return nil, errors.Wrap(err, "NewCredential fail") - } - - // 从凭证提供器获取凭证 - cred, err := provider.GetCredential() - if err != nil { - return nil, errors.Wrap(err, "GetCredential fail") - } - - // 构建policy - utcTime := time.Now().UTC() - date := utcTime.Format("20060102") - expiration := utcTime.Add(1 * time.Hour) - policyMap := map[string]any{ - "expiration": expiration.Format("2006-01-02T15:04:05.000Z"), - "conditions": []any{ - map[string]string{"bucket": c.Bucket}, - map[string]string{"x-oss-signature-version": "OSS4-HMAC-SHA256"}, - map[string]string{"x-oss-credential": fmt.Sprintf("%v/%v/%v/%v/aliyun_v4_request", *cred.AccessKeyId, date, c.Region, product)}, - map[string]string{"x-oss-date": utcTime.Format("20060102T150405Z")}, - map[string]string{"x-oss-security-token": *cred.SecurityToken}, - }, - } - - // 将policy转换为 JSON 格式 - policy, err := json.Marshal(policyMap) - if err != nil { - return nil, errors.Wrap(err, "json.Marshal fail") - } - - // 构造待签名字符串(StringToSign) - stringToSign := base64.StdEncoding.EncodeToString([]byte(policy)) - - hmacHash := func() hash.Hash { return sha256.New() } - // 构建signing key - signingKey := "aliyun_v4" + *cred.AccessKeySecret - h1 := hmac.New(hmacHash, []byte(signingKey)) - io.WriteString(h1, date) - h1Key := h1.Sum(nil) - - h2 := hmac.New(hmacHash, h1Key) - io.WriteString(h2, c.Region) - h2Key := h2.Sum(nil) - - h3 := hmac.New(hmacHash, h2Key) - io.WriteString(h3, product) - h3Key := h3.Sum(nil) - - h4 := hmac.New(hmacHash, h3Key) - io.WriteString(h4, "aliyun_v4_request") - h4Key := h4.Sum(nil) - - // 生成签名 - h := hmac.New(hmacHash, h4Key) - io.WriteString(h, stringToSign) - signature := hex.EncodeToString(h.Sum(nil)) - - var callbackParam CallbackParam - callbackParam.CallbackUrl = c.CallbackURL - callbackParam.CallbackBody = "filename=${object}&size=${size}&mimeType=${mimeType}" - callbackParam.CallbackBodyType = "application/x-www-form-urlencoded" - callback_str, err := json.Marshal(callbackParam) - if err != nil { - return nil, errors.Wrap(err, "callback json err:") - } - callbackBase64 := base64.StdEncoding.EncodeToString(callback_str) - // 构建返回给前端的表单 - return &PolicyToken{ - Policy: stringToSign, - SecurityToken: *cred.SecurityToken, - SignatureVersion: "OSS4-HMAC-SHA256", - Credential: fmt.Sprintf("%v/%v/%v/%v/aliyun_v4_request", *cred.AccessKeyId, date, c.Region, product), - Date: utcTime.UTC().Format("20060102T150405Z"), - Signature: signature, - Host: host, // 返回 OSS 上传地址 - Dir: dir, // 返回上传目录 - Callback: callbackBase64, // 返回上传回调参数 - }, nil -} diff --git a/backend/providers/ali/oss_client.go b/backend/providers/ali/oss_client.go new file mode 100644 index 0000000..d537e48 --- /dev/null +++ b/backend/providers/ali/oss_client.go @@ -0,0 +1,27 @@ +package ali + +import ( + "context" + "log" + "strings" + + "github.com/aliyun/alibabacloud-oss-go-sdk-v2/oss" +) + +type OSSClient struct { + client *oss.Client + config *Config +} + +func (c *OSSClient) GetClient() *oss.Client { + return c.client +} + +func (c *OSSClient) PreSignUpload(ctx context.Context, path string) (*oss.PresignResult, error) { + request := &oss.PutObjectRequest{ + Bucket: oss.Ptr(c.config.Bucket), + Key: oss.Ptr("quyun/" + strings.Trim(path, "/")), + } + log.Printf("%+v", request) + return c.client.Presign(ctx, request) +} diff --git a/backend/test.http b/backend/test.http index 9b8dfd0..be8b5bc 100644 --- a/backend/test.http +++ b/backend/test.http @@ -1,4 +1,5 @@ @host=http://localhost:8088 +@token=Bearer eyJhbGciOiJIUzI1NiIsInR5cCI6IkpXVCJ9.eyJ1c2VyX2lkIjoxLCJleHAiOjE3NDU0MTExMDYsIm5iZiI6MTc0NDgwNjI5Nn0.1CshygaU01D763hElVG4j7Rj-bkOP3gawQio3xei0RQ @md5=959e5310105c96e653f10b74e5bdc36b @idx=9 @@ -58,4 +59,10 @@ Content-Type: application/json ### get orders GET {{host}}/mine HTTP/1.1 -Content-Type: application/json \ No newline at end of file +Content-Type: application/json + + +### precheck +GET {{host}}/v1/admin/uploads/pre-uploaded-check/abc.mp4 HTTP/1.1 +Content-Type: application/json +authorization: {{token}} \ No newline at end of file