feat: upload
This commit is contained in:
@@ -1,29 +1,25 @@
|
||||
package medias
|
||||
|
||||
import (
|
||||
"crypto/md5"
|
||||
"encoding/hex"
|
||||
"errors"
|
||||
"fmt"
|
||||
"io"
|
||||
"mime/multipart"
|
||||
"os"
|
||||
"path/filepath"
|
||||
"time"
|
||||
|
||||
"backend/app/http/storages"
|
||||
"backend/app/http/tenants"
|
||||
"backend/database/models/qvyun_v2/public/model"
|
||||
"backend/pkg/storage"
|
||||
"backend/providers/jwt"
|
||||
|
||||
"github.com/gofiber/fiber/v3"
|
||||
log "github.com/sirupsen/logrus"
|
||||
)
|
||||
|
||||
const (
|
||||
uploadTempDir = "./temp/chunks" // 临时分片目录
|
||||
uploadStorageDir = "./uploads" // 最终文件存储目录
|
||||
)
|
||||
|
||||
// @provider
|
||||
type Controller struct {
|
||||
svc *Service
|
||||
log *log.Entry `inject:"false"`
|
||||
tenantSvc *tenants.Service
|
||||
svc *Service
|
||||
storageSvc *storages.Service
|
||||
log *log.Entry `inject:"false"`
|
||||
}
|
||||
|
||||
func (ctl *Controller) Prepare() error {
|
||||
@@ -32,119 +28,55 @@ func (ctl *Controller) Prepare() error {
|
||||
}
|
||||
|
||||
// Upload
|
||||
// @Router /api/v1/medias/upload [post]
|
||||
// @Router /api/v1/medias/:tenant/upload [post]
|
||||
// @Bind tenantSlug path
|
||||
// @Bind req body
|
||||
// @Bind file file
|
||||
func (ctl *Controller) Upload(ctx fiber.Ctx, file *multipart.FileHeader, req *UploadReq) (*UploadResp, error) {
|
||||
// 使用MD5创建唯一的临时目录
|
||||
tempDir := filepath.Join(uploadTempDir, req.FileMD5)
|
||||
if err := os.MkdirAll(tempDir, 0o755); err != nil {
|
||||
// @Bind claim local
|
||||
func (ctl *Controller) Upload(ctx fiber.Ctx, claim *jwt.Claims, tenantSlug string, file *multipart.FileHeader, req *UploadReq) (*storage.UploadedFile, error) {
|
||||
tenant, err := ctl.tenantSvc.GetTenantBySlug(ctx.Context(), tenantSlug)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
chunkPath := filepath.Join(tempDir, fmt.Sprintf("chunk_%d", req.ChunkNumber))
|
||||
if err := ctx.SaveFile(file, chunkPath); err != nil {
|
||||
defaultStorage, err := ctl.storageSvc.GetDefault(ctx.Context())
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
// 如果是最后一个分片
|
||||
if req.ChunkNumber == req.TotalChunks-1 {
|
||||
// 生成唯一的文件存储路径
|
||||
ext := filepath.Ext(req.FileName)
|
||||
storageDir := filepath.Join(uploadStorageDir, time.Now().Format("2006/01/02"))
|
||||
if err := os.MkdirAll(storageDir, 0o755); err != nil {
|
||||
os.RemoveAll(tempDir)
|
||||
return nil, err
|
||||
}
|
||||
|
||||
finalPath := filepath.Join(storageDir, req.FileMD5+ext)
|
||||
|
||||
// 计算所有分片的实际大小总和
|
||||
totalSize, err := calculateTotalSize(tempDir, req.TotalChunks)
|
||||
if err != nil {
|
||||
os.RemoveAll(tempDir)
|
||||
return nil, fmt.Errorf("计算文件大小失败: %w", err)
|
||||
}
|
||||
|
||||
// 合并文件
|
||||
if err := combineChunks(tempDir, finalPath, req.TotalChunks); err != nil {
|
||||
os.RemoveAll(tempDir)
|
||||
return nil, fmt.Errorf("合并文件失败: %w", err)
|
||||
}
|
||||
|
||||
// 验证MD5
|
||||
calculatedMD5, err := calculateFileMD5(finalPath)
|
||||
if err != nil || calculatedMD5 != req.FileMD5 {
|
||||
os.RemoveAll(tempDir)
|
||||
os.Remove(finalPath)
|
||||
return nil, errors.New("文件MD5验证失败")
|
||||
}
|
||||
|
||||
// 清理临时目录
|
||||
os.RemoveAll(tempDir)
|
||||
|
||||
return &UploadResp{
|
||||
Files: []UploadFile{
|
||||
{
|
||||
HashID: calculatedMD5,
|
||||
Name: req.FileName,
|
||||
Path: finalPath,
|
||||
Size: totalSize,
|
||||
MimeType: file.Header.Get("Content-Type"),
|
||||
},
|
||||
},
|
||||
}, nil
|
||||
}
|
||||
|
||||
return &UploadResp{}, nil
|
||||
}
|
||||
|
||||
// 计算所有分片的实际大小总和
|
||||
func calculateTotalSize(tempDir string, totalChunks int) (int64, error) {
|
||||
var totalSize int64
|
||||
for i := 0; i < totalChunks; i++ {
|
||||
chunkPath := filepath.Join(tempDir, fmt.Sprintf("chunk_%d", i))
|
||||
info, err := os.Stat(chunkPath)
|
||||
if err != nil {
|
||||
return 0, err
|
||||
}
|
||||
totalSize += info.Size()
|
||||
}
|
||||
return totalSize, nil
|
||||
}
|
||||
|
||||
func combineChunks(tempDir, finalPath string, totalChunks int) error {
|
||||
finalFile, err := os.Create(finalPath)
|
||||
uploader, err := storage.NewUploader(req.FileName, req.ChunkNumber, req.TotalChunks, req.FileMD5)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
defer finalFile.Close()
|
||||
|
||||
for i := 0; i < totalChunks; i++ {
|
||||
chunkPath := fmt.Sprintf("%s/chunk_%d", tempDir, i)
|
||||
chunk, err := os.ReadFile(chunkPath)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
if _, err := finalFile.Write(chunk); err != nil {
|
||||
return err
|
||||
}
|
||||
return nil, err
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
func calculateFileMD5(filePath string) (string, error) {
|
||||
file, err := os.Open(filePath)
|
||||
uploadedFile, err := uploader.Save(ctx, file)
|
||||
if err != nil {
|
||||
return "", err
|
||||
}
|
||||
defer file.Close()
|
||||
|
||||
hash := md5.New()
|
||||
if _, err := io.Copy(hash, file); err != nil {
|
||||
return "", err
|
||||
return nil, err
|
||||
}
|
||||
|
||||
return hex.EncodeToString(hash.Sum(nil)), nil
|
||||
if uploadedFile == nil {
|
||||
return uploadedFile, nil
|
||||
}
|
||||
|
||||
uploadedFile, err = storage.Build(defaultStorage).Save(ctx.Context(), uploadedFile)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
// save to db
|
||||
_, err = ctl.svc.Create(ctx.Context(), &model.Medias{
|
||||
CreatedAt: time.Now(),
|
||||
UpdatedAt: time.Now(),
|
||||
TenantID: tenant.ID,
|
||||
UserID: claim.UserID,
|
||||
StorageID: defaultStorage.ID,
|
||||
Name: uploadedFile.Name,
|
||||
UUID: uploadedFile.Hash,
|
||||
MimeType: uploadedFile.MimeType,
|
||||
Size: uploadedFile.Size,
|
||||
Path: uploadedFile.Path,
|
||||
})
|
||||
uploadedFile.Preview = ""
|
||||
|
||||
return uploadedFile, err
|
||||
}
|
||||
|
||||
@@ -6,16 +6,3 @@ type UploadReq struct {
|
||||
TotalChunks int `form:"total_chunks"`
|
||||
FileMD5 string `form:"file_md5"`
|
||||
}
|
||||
|
||||
type UploadResp struct {
|
||||
Files []UploadFile `json:"files"`
|
||||
}
|
||||
|
||||
type UploadFile struct {
|
||||
HashID string `json:"hash_id"`
|
||||
Name string `json:"name"`
|
||||
Size int64 `json:"size"`
|
||||
MimeType string `json:"type"`
|
||||
Path string `json:"path"`
|
||||
Preview string `json:"preview"`
|
||||
}
|
||||
|
||||
@@ -3,6 +3,9 @@ package medias
|
||||
import (
|
||||
"database/sql"
|
||||
|
||||
"backend/app/http/storages"
|
||||
"backend/app/http/tenants"
|
||||
|
||||
"git.ipao.vip/rogeecn/atom"
|
||||
"git.ipao.vip/rogeecn/atom/container"
|
||||
"git.ipao.vip/rogeecn/atom/contracts"
|
||||
@@ -11,10 +14,14 @@ import (
|
||||
|
||||
func Provide(opts ...opt.Option) error {
|
||||
if err := container.Container.Provide(func(
|
||||
storageSvc *storages.Service,
|
||||
svc *Service,
|
||||
tenantSvc *tenants.Service,
|
||||
) (*Controller, error) {
|
||||
obj := &Controller{
|
||||
svc: svc,
|
||||
storageSvc: storageSvc,
|
||||
svc: svc,
|
||||
tenantSvc: tenantSvc,
|
||||
}
|
||||
if err := obj.Prepare(); err != nil {
|
||||
return nil, err
|
||||
|
||||
@@ -4,6 +4,8 @@ package medias
|
||||
|
||||
import (
|
||||
. "backend/pkg/f"
|
||||
"backend/providers/jwt"
|
||||
"mime/multipart"
|
||||
|
||||
_ "git.ipao.vip/rogeecn/atom"
|
||||
_ "git.ipao.vip/rogeecn/atom/contracts"
|
||||
@@ -28,9 +30,11 @@ func (r *Routes) Name() string {
|
||||
|
||||
func (r *Routes) Register(router fiber.Router) {
|
||||
// 注册路由组: Controller
|
||||
router.Post("/api/v1/medias/upload", DataFunc2(
|
||||
router.Post("/api/v1/medias/:tenant/upload", DataFunc4(
|
||||
r.controller.Upload,
|
||||
File("file"),
|
||||
Local[*jwt.Claims]("claim"),
|
||||
PathParam[string]("tenantSlug"),
|
||||
File[multipart.FileHeader]("file"),
|
||||
Body[UploadReq]("req"),
|
||||
))
|
||||
|
||||
|
||||
@@ -1,10 +1,16 @@
|
||||
package medias
|
||||
|
||||
import (
|
||||
"context"
|
||||
"database/sql"
|
||||
|
||||
"backend/database/models/qvyun_v2/public/model"
|
||||
"backend/database/models/qvyun_v2/public/table"
|
||||
"backend/providers/otel"
|
||||
|
||||
. "github.com/go-jet/jet/v2/postgres"
|
||||
log "github.com/sirupsen/logrus"
|
||||
semconv "go.opentelemetry.io/otel/semconv/v1.4.0"
|
||||
)
|
||||
|
||||
// @provider:except
|
||||
@@ -18,3 +24,19 @@ func (svc *Service) Prepare() error {
|
||||
_ = Int(1)
|
||||
return nil
|
||||
}
|
||||
|
||||
// Create
|
||||
func (svc *Service) Create(ctx context.Context, m *model.Medias) (*model.Medias, error) {
|
||||
_, span := otel.Start(ctx, "medias.service.Create")
|
||||
defer span.End()
|
||||
|
||||
tbl := table.Medias
|
||||
stmt := tbl.INSERT(tbl.MutableColumns).MODEL(m).RETURNING(tbl.AllColumns)
|
||||
span.SetAttributes(semconv.DBStatementKey.String(stmt.DebugSql()))
|
||||
|
||||
var ret model.Medias
|
||||
if err := stmt.QueryContext(ctx, svc.db, &ret); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
return &ret, nil
|
||||
}
|
||||
|
||||
@@ -10,7 +10,7 @@ type Storage struct {
|
||||
}
|
||||
|
||||
type CreateStorageReq struct {
|
||||
Name string `json:"name"`
|
||||
Config string `json:"config"`
|
||||
Type fields.StorageType `json:"type"`
|
||||
Name string `json:"name"`
|
||||
Config fields.StorageConfig `json:"config"`
|
||||
Type fields.StorageType `json:"type"`
|
||||
}
|
||||
|
||||
@@ -122,3 +122,19 @@ func (svc *Service) SetDefault(ctx context.Context, id int64) error {
|
||||
// Commit transaction
|
||||
return tx.Commit()
|
||||
}
|
||||
|
||||
// GetDefault
|
||||
func (svc *Service) GetDefault(ctx context.Context) (*model.Storages, error) {
|
||||
_, span := otel.Start(ctx, "storages.service.GetDefault")
|
||||
defer span.End()
|
||||
|
||||
tbl := table.Storages
|
||||
stmt := tbl.SELECT(tbl.AllColumns).WHERE(tbl.IsDefault.EQ(Bool(true)))
|
||||
span.SetAttributes(semconv.DBStatementKey.String(stmt.DebugSql()))
|
||||
|
||||
var storage model.Storages
|
||||
if err := stmt.QueryContext(ctx, svc.db, &storage); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
return &storage, nil
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user