feat: update ali oss client

This commit is contained in:
yanghao05
2025-04-17 19:56:35 +08:00
parent 0d0887b4fb
commit 1ca75b6060
11 changed files with 133 additions and 295 deletions

View File

@@ -69,12 +69,12 @@ func Provide(opts ...opt.Option) error {
return err
}
if err := container.Container.Provide(func(
ali *ali.Config,
app *app.Config,
oss *ali.OSSClient,
) (*uploads, error) {
obj := &uploads{
ali: ali,
app: app,
oss: oss,
}
return obj, nil

View File

@@ -8,7 +8,6 @@ import (
_ "go.ipao.vip/atom"
_ "go.ipao.vip/atom/contracts"
. "go.ipao.vip/atom/fen"
"mime/multipart"
"quyun/app/requests"
)
@@ -82,26 +81,10 @@ func (r *Routes) Register(router fiber.Router) {
))
// 注册路由组: uploads
router.Post("/v1/admin/uploads/:md5/chunks/:idx", Func3(
r.uploads.Chunks,
PathParam[string]("md5"),
PathParam[string]("idx"),
File[multipart.FileHeader]("file"),
))
router.Post("/v1/admin/uploads/:md5/complete", Func2(
r.uploads.Complete,
PathParam[string]("md5"),
Body[UploadFileInfo]("body"),
))
router.Get("/v1/admin/uploads/token", DataFunc0(
r.uploads.Token,
))
router.Get("/v1/admin/uploads/pre-uploaded-check/:md5", Func1(
router.Get("/v1/admin/uploads/pre-uploaded-check/:md5.:ext", DataFunc2(
r.uploads.PreUploadCheck,
PathParam[string]("md5"),
PathParam[string]("ext"),
))
router.Post("/v1/admin/uploads/post-uploaded-action", Func1(

View File

@@ -3,20 +3,16 @@ package admin
import (
"errors"
"fmt"
"mime/multipart"
"os"
"path/filepath"
"time"
"quyun/app/models"
"quyun/database/schemas/public/model"
"quyun/pkg/utils"
"quyun/providers/ali"
"quyun/providers/app"
"github.com/aliyun/alibabacloud-oss-go-sdk-v2/oss"
"github.com/go-jet/jet/v2/qrm"
"github.com/gofiber/fiber/v3"
log "github.com/sirupsen/logrus"
)
const UPLOAD_PATH = "quyun"
@@ -24,140 +20,30 @@ const UPLOAD_PATH = "quyun"
// @provider
type uploads struct {
app *app.Config
ali *ali.Config
oss *ali.OSSClient
}
func (up *uploads) storagePath() string {
return filepath.Join(up.app.StoragePath, "uploads")
}
type UploadChunk struct {
Chunk int `query:"chunk"`
Md5 string `query:"md5"`
}
type UploadFileInfo struct {
Md5 string `json:"md5"`
Filename string `json:"filename"`
Mime string `json:"mime"`
Chunks int `json:"chunks"`
}
// Upload chunks
// @Router /v1/admin/uploads/:md5/chunks/:idx [post]
// @Bind md5 path
// @Bind idx path
// @Bind file file
func (up *uploads) Chunks(ctx fiber.Ctx, md5, idx string, file *multipart.FileHeader) error {
tmpPath := filepath.Join(up.storagePath(), md5, idx)
// if tmpPath not exists, create it
if _, err := os.Stat(tmpPath); os.IsNotExist(err) {
if err := os.MkdirAll(filepath.Dir(tmpPath), os.ModePerm); err != nil {
log.WithError(err).Errorf("create tmpPath failed %s", tmpPath)
return err
}
}
// save file to tmpPath
if err := ctx.SaveFile(file, tmpPath); err != nil {
log.WithError(err).Errorf("save file to tmpPath failed %s", tmpPath)
return err
}
return nil
}
// Complete uploads
// @Router /v1/admin/uploads/:md5/complete [post]
// @Bind md5 path
// @Bind body body
func (up *uploads) Complete(ctx fiber.Ctx, md5 string, body *UploadFileInfo) error {
// merge chunks
path := filepath.Join(up.storagePath(), md5)
defer os.RemoveAll(path)
targetFile := filepath.Join(up.storagePath(), md5, body.Filename)
// if targetFile not exists, create it
tf, err := os.Create(targetFile)
if err != nil {
return err
}
for i := 0; i < body.Chunks; i++ {
tmpPath := filepath.Join(up.storagePath(), md5, fmt.Sprintf("%d", i))
// open chunk file
chunkFile, err := os.Open(tmpPath)
if err != nil {
tf.Close()
return err
}
// copy chunk file to target file
if _, err := tf.ReadFrom(chunkFile); err != nil {
chunkFile.Close()
tf.Close()
return err
}
chunkFile.Close()
}
tf.Close()
// validate md5
ok, err := utils.CompareFileMd5(targetFile, md5)
if err != nil {
return err
}
if !ok {
return errors.New("md5 not match")
}
// save file to target path
targetPath := filepath.Join(up.storagePath(), md5+filepath.Ext(body.Filename))
if err := os.Rename(targetFile, targetPath); err != nil {
return err
}
fState, err := os.Stat(targetPath)
if err != nil {
return err
}
model := &model.Medias{
CreatedAt: time.Now(),
Name: body.Filename,
MimeType: body.Mime,
Size: fState.Size(), // Updated to use fState.Size()
Path: targetPath,
}
// save to db
if err := models.Medias.Create(ctx.Context(), model); err != nil {
return err
}
log.Infof("File %s uploaded successfully", body.Filename)
return nil
}
// Token
// @Router /v1/admin/uploads/token [get]
func (up *uploads) Token(ctx fiber.Ctx) (*ali.PolicyToken, error) {
return up.ali.GetToken(UPLOAD_PATH)
type PreCheckResp struct {
Exists bool `json:"exists"`
PreSign *oss.PresignResult `json:"pre_sign"`
}
// PreUploadCheck
// @Router /v1/admin/uploads/pre-uploaded-check/:md5 [get]
// @Router /v1/admin/uploads/pre-uploaded-check/:md5.:ext [get]
// @Bind md5 path
func (up *uploads) PreUploadCheck(ctx fiber.Ctx, md5 string) error {
// @Bind ext path
func (up *uploads) PreUploadCheck(ctx fiber.Ctx, md5, ext string) (*PreCheckResp, error) {
_, err := models.Medias.GetByHash(ctx.Context(), md5)
if err != nil && errors.Is(err, qrm.ErrNoRows) {
return ctx.SendString("ok")
preSign, err := up.oss.PreSignUpload(ctx.Context(), fmt.Sprintf("%s%s", md5, ext))
if err != nil {
return nil, err
}
return &PreCheckResp{Exists: false, PreSign: preSign}, nil
}
return ctx.SendString("exists")
return &PreCheckResp{Exists: true}, nil
}
type PostUploadedForm struct {

View File

@@ -0,0 +1,49 @@
package jobs
import (
"context"
"time"
. "github.com/riverqueue/river"
log "github.com/sirupsen/logrus"
_ "go.ipao.vip/atom"
"go.ipao.vip/atom/contracts"
_ "go.ipao.vip/atom/contracts"
)
var _ contracts.JobArgs = (*WechatCallback)(nil)
type DownloadFromAliOSS struct {
Bucket string
Path string
}
func (s DownloadFromAliOSS) InsertOpts() InsertOpts {
return InsertOpts{
Queue: QueueDefault,
Priority: PriorityDefault,
}
}
func (s DownloadFromAliOSS) Kind() string { return "download_from_ali_oss" }
func (a DownloadFromAliOSS) UniqueID() string { return a.Kind() }
var _ Worker[DownloadFromAliOSS] = (*DownloadFromAliOSSWorker)(nil)
// @provider(job)
type DownloadFromAliOSSWorker struct {
WorkerDefaults[DownloadFromAliOSS]
}
func (w *DownloadFromAliOSSWorker) Work(ctx context.Context, job *Job[DownloadFromAliOSS]) error {
log := log.WithField("job", job.Args.Kind())
log.Infof("[Start] Working on job with strings: %+v", job.Args)
defer log.Infof("[End] Finished %s", job.Args.Kind())
return nil
}
func (w *DownloadFromAliOSSWorker) NextRetry(job *Job[DownloadFromAliOSS]) time.Time {
return time.Now().Add(30 * time.Second)
}

View File

@@ -37,6 +37,18 @@ func Provide(opts ...opt.Option) error {
}, atom.GroupInitial); err != nil {
return err
}
if err := container.Container.Provide(func(
__job *job.Job,
) (contracts.Initial, error) {
obj := &DownloadFromAliOSSWorker{}
if err := river.AddWorkerSafely(__job.Workers, obj); err != nil {
return nil, err
}
return obj, nil
}, atom.GroupInitial); err != nil {
return err
}
if err := container.Container.Provide(func(
__job *job.Job,
) (contracts.Initial, error) {

View File

@@ -66,6 +66,7 @@ require (
github.com/Rican7/retry v0.3.1 // indirect
github.com/alibabacloud-go/debug v1.0.1 // indirect
github.com/alibabacloud-go/tea v1.2.2 // indirect
github.com/aliyun/alibabacloud-oss-go-sdk-v2 v1.2.1 // indirect
github.com/andybalholm/brotli v1.1.1 // indirect
github.com/cenkalti/backoff/v4 v4.3.0 // indirect
github.com/cespare/xxhash/v2 v2.3.0 // indirect
@@ -164,6 +165,7 @@ require (
golang.org/x/mod v0.22.0 // indirect
golang.org/x/sys v0.31.0 // indirect
golang.org/x/text v0.23.0 // indirect
golang.org/x/time v0.5.0 // indirect
golang.org/x/tools v0.29.0 // indirect
google.golang.org/appengine v1.6.8 // indirect
google.golang.org/genproto v0.0.0-20230410155749-daa745c078e1 // indirect

View File

@@ -24,6 +24,8 @@ github.com/alibabacloud-go/debug v1.0.1 h1:MsW9SmUtbb1Fnt3ieC6NNZi6aEwrXfDksD4QA
github.com/alibabacloud-go/debug v1.0.1/go.mod h1:8gfgZCCAC3+SCzjWtY053FrOcd4/qlH6IHTI4QyICOc=
github.com/alibabacloud-go/tea v1.2.2 h1:aTsR6Rl3ANWPfqeQugPglfurloyBJY85eFy7Gc1+8oU=
github.com/alibabacloud-go/tea v1.2.2/go.mod h1:CF3vOzEMAG+bR4WOql8gc2G9H3EkH3ZLAQdpmpXMgwk=
github.com/aliyun/alibabacloud-oss-go-sdk-v2 v1.2.1 h1:sOhpJdR/+lbQniznp3cYSfwQlXbVkT0ccuiZScBrI6Y=
github.com/aliyun/alibabacloud-oss-go-sdk-v2 v1.2.1/go.mod h1:FTzydeQVmR24FI0D6XWUOMKckjXehM/jgMn1xC+DA9M=
github.com/aliyun/credentials-go v1.4.5 h1:O76WYKgdy1oQYYiJkERjlA2dxGuvLRrzuO2ScrtGWSk=
github.com/aliyun/credentials-go v1.4.5/go.mod h1:Jm6d+xIgwJVLVWT561vy67ZRP4lPTQxMbEYRuT2Ti1U=
github.com/andybalholm/brotli v1.1.1 h1:PR2pgnyFznKEugtsUo0xLdDop5SKXd5Qf5ysW+7XdTA=

View File

@@ -1,6 +1,8 @@
package ali
import (
"github.com/aliyun/alibabacloud-oss-go-sdk-v2/oss"
"github.com/aliyun/alibabacloud-oss-go-sdk-v2/oss/credentials"
"go.ipao.vip/atom/container"
"go.ipao.vip/atom/opt"
)
@@ -32,25 +34,16 @@ func Provide(opts ...opt.Option) error {
return err
}
return container.Container.Provide(func() (*Config, error) {
return &config, nil
return container.Container.Provide(func() (*OSSClient, error) {
cred := credentials.NewStaticCredentialsProvider(config.AccessKeyId, config.AccessKeySecret)
cfg := oss.LoadDefaultConfig().
WithCredentialsProvider(cred).
WithRegion(config.Region)
ossClient := oss.NewClient(cfg)
return &OSSClient{
client: ossClient,
config: &config,
}, nil
}, o.DiOptions()...)
}
type PolicyToken struct {
Policy string `json:"policy"`
SecurityToken string `json:"security_token"`
SignatureVersion string `json:"x_oss_signature_version"`
Credential string `json:"x_oss_credential"`
Date string `json:"x_oss_date"`
Signature string `json:"signature"`
Host string `json:"host"`
Dir string `json:"dir"`
Callback string `json:"callback"`
}
type CallbackParam struct {
CallbackUrl string `json:"callbackUrl"`
CallbackBody string `json:"callbackBody"`
CallbackBodyType string `json:"callbackBodyType"`
}

View File

@@ -1,123 +0,0 @@
package ali
import (
"crypto/hmac"
"crypto/sha256"
"encoding/base64"
"encoding/hex"
"encoding/json"
"fmt"
"hash"
"io"
"strings"
"time"
"github.com/aliyun/credentials-go/credentials"
"github.com/pkg/errors"
)
func (c *Config) GetToken(path string) (*PolicyToken, error) {
product := "oss"
host := fmt.Sprintf("https://%s.oss-%s.aliyuncs.com", c.Bucket, c.Region)
if c.Host != nil {
host = *c.Host
}
// 设置上传目录
dir := strings.TrimRight(path, "/") + "/"
// callbackUrl为 上传回调服务器的URL请将下面的IP和Port配置为您自己的真实信息。
config := new(credentials.Config).
SetType("access_key").
SetAccessKeyId(c.AccessKeyId).
SetAccessKeySecret(c.AccessKeySecret).
SetPolicy("").
SetRoleSessionExpiration(3600)
// SetType("ram_role_arn").
// SetRoleArn(os.Getenv("OSS_STS_ROLE_ARN")).
// SetRoleSessionName("Role_Session_Name").
// 根据配置创建凭证提供器
provider, err := credentials.NewCredential(config)
if err != nil {
return nil, errors.Wrap(err, "NewCredential fail")
}
// 从凭证提供器获取凭证
cred, err := provider.GetCredential()
if err != nil {
return nil, errors.Wrap(err, "GetCredential fail")
}
// 构建policy
utcTime := time.Now().UTC()
date := utcTime.Format("20060102")
expiration := utcTime.Add(1 * time.Hour)
policyMap := map[string]any{
"expiration": expiration.Format("2006-01-02T15:04:05.000Z"),
"conditions": []any{
map[string]string{"bucket": c.Bucket},
map[string]string{"x-oss-signature-version": "OSS4-HMAC-SHA256"},
map[string]string{"x-oss-credential": fmt.Sprintf("%v/%v/%v/%v/aliyun_v4_request", *cred.AccessKeyId, date, c.Region, product)},
map[string]string{"x-oss-date": utcTime.Format("20060102T150405Z")},
map[string]string{"x-oss-security-token": *cred.SecurityToken},
},
}
// 将policy转换为 JSON 格式
policy, err := json.Marshal(policyMap)
if err != nil {
return nil, errors.Wrap(err, "json.Marshal fail")
}
// 构造待签名字符串StringToSign
stringToSign := base64.StdEncoding.EncodeToString([]byte(policy))
hmacHash := func() hash.Hash { return sha256.New() }
// 构建signing key
signingKey := "aliyun_v4" + *cred.AccessKeySecret
h1 := hmac.New(hmacHash, []byte(signingKey))
io.WriteString(h1, date)
h1Key := h1.Sum(nil)
h2 := hmac.New(hmacHash, h1Key)
io.WriteString(h2, c.Region)
h2Key := h2.Sum(nil)
h3 := hmac.New(hmacHash, h2Key)
io.WriteString(h3, product)
h3Key := h3.Sum(nil)
h4 := hmac.New(hmacHash, h3Key)
io.WriteString(h4, "aliyun_v4_request")
h4Key := h4.Sum(nil)
// 生成签名
h := hmac.New(hmacHash, h4Key)
io.WriteString(h, stringToSign)
signature := hex.EncodeToString(h.Sum(nil))
var callbackParam CallbackParam
callbackParam.CallbackUrl = c.CallbackURL
callbackParam.CallbackBody = "filename=${object}&size=${size}&mimeType=${mimeType}"
callbackParam.CallbackBodyType = "application/x-www-form-urlencoded"
callback_str, err := json.Marshal(callbackParam)
if err != nil {
return nil, errors.Wrap(err, "callback json err:")
}
callbackBase64 := base64.StdEncoding.EncodeToString(callback_str)
// 构建返回给前端的表单
return &PolicyToken{
Policy: stringToSign,
SecurityToken: *cred.SecurityToken,
SignatureVersion: "OSS4-HMAC-SHA256",
Credential: fmt.Sprintf("%v/%v/%v/%v/aliyun_v4_request", *cred.AccessKeyId, date, c.Region, product),
Date: utcTime.UTC().Format("20060102T150405Z"),
Signature: signature,
Host: host, // 返回 OSS 上传地址
Dir: dir, // 返回上传目录
Callback: callbackBase64, // 返回上传回调参数
}, nil
}

View File

@@ -0,0 +1,27 @@
package ali
import (
"context"
"log"
"strings"
"github.com/aliyun/alibabacloud-oss-go-sdk-v2/oss"
)
type OSSClient struct {
client *oss.Client
config *Config
}
func (c *OSSClient) GetClient() *oss.Client {
return c.client
}
func (c *OSSClient) PreSignUpload(ctx context.Context, path string) (*oss.PresignResult, error) {
request := &oss.PutObjectRequest{
Bucket: oss.Ptr(c.config.Bucket),
Key: oss.Ptr("quyun/" + strings.Trim(path, "/")),
}
log.Printf("%+v", request)
return c.client.Presign(ctx, request)
}

View File

@@ -1,4 +1,5 @@
@host=http://localhost:8088
@token=Bearer eyJhbGciOiJIUzI1NiIsInR5cCI6IkpXVCJ9.eyJ1c2VyX2lkIjoxLCJleHAiOjE3NDU0MTExMDYsIm5iZiI6MTc0NDgwNjI5Nn0.1CshygaU01D763hElVG4j7Rj-bkOP3gawQio3xei0RQ
@md5=959e5310105c96e653f10b74e5bdc36b
@idx=9
@@ -58,4 +59,10 @@ Content-Type: application/json
### get orders
GET {{host}}/mine HTTP/1.1
Content-Type: application/json
Content-Type: application/json
### precheck
GET {{host}}/v1/admin/uploads/pre-uploaded-check/abc.mp4 HTTP/1.1
Content-Type: application/json
authorization: {{token}}