feat: chunk uploads

This commit is contained in:
Rogee
2025-01-15 00:26:10 +08:00
parent 591771ce77
commit 9bfdf0e0ea
16 changed files with 451 additions and 40 deletions

View File

@@ -1,10 +1,25 @@
package medias
import (
"github.com/gofiber/fiber"
"crypto/md5"
"encoding/hex"
"errors"
"fmt"
"io"
"mime/multipart"
"os"
"path/filepath"
"time"
"github.com/gofiber/fiber/v3"
log "github.com/sirupsen/logrus"
)
const (
uploadTempDir = "./temp/chunks" // 临时分片目录
uploadStorageDir = "./uploads" // 最终文件存储目录
)
// @provider
type Controller struct {
svc *Service
@@ -19,6 +34,117 @@ func (ctl *Controller) Prepare() error {
// Upload
// @Router /api/v1/medias/upload [post]
// @Bind req body
func (ctl *Controller) Upload(ctx fiber.Ctx, req *UploadReq) (*UploadResp, error) {
return ctl.svc.Upload(ctx.Context(), req)
// @Bind file file
func (ctl *Controller) Upload(ctx fiber.Ctx, file *multipart.FileHeader, req *UploadReq) (*UploadResp, error) {
// 使用MD5创建唯一的临时目录
tempDir := filepath.Join(uploadTempDir, req.FileMD5)
if err := os.MkdirAll(tempDir, 0o755); err != nil {
return nil, err
}
chunkPath := filepath.Join(tempDir, fmt.Sprintf("chunk_%d", req.ChunkNumber))
if err := ctx.SaveFile(file, chunkPath); err != nil {
return nil, err
}
// 如果是最后一个分片
if req.ChunkNumber == req.TotalChunks-1 {
// 生成唯一的文件存储路径
ext := filepath.Ext(req.FileName)
storageDir := filepath.Join(uploadStorageDir, time.Now().Format("2006/01/02"))
if err := os.MkdirAll(storageDir, 0o755); err != nil {
os.RemoveAll(tempDir)
return nil, err
}
finalPath := filepath.Join(storageDir, req.FileMD5+ext)
// 计算所有分片的实际大小总和
totalSize, err := calculateTotalSize(tempDir, req.TotalChunks)
if err != nil {
os.RemoveAll(tempDir)
return nil, fmt.Errorf("计算文件大小失败: %w", err)
}
// 合并文件
if err := combineChunks(tempDir, finalPath, req.TotalChunks); err != nil {
os.RemoveAll(tempDir)
return nil, fmt.Errorf("合并文件失败: %w", err)
}
// 验证MD5
calculatedMD5, err := calculateFileMD5(finalPath)
if err != nil || calculatedMD5 != req.FileMD5 {
os.RemoveAll(tempDir)
os.Remove(finalPath)
return nil, errors.New("文件MD5验证失败")
}
// 清理临时目录
os.RemoveAll(tempDir)
return &UploadResp{
Files: []UploadFile{
{
HashID: calculatedMD5,
Name: req.FileName,
Path: finalPath,
Size: totalSize,
MimeType: file.Header.Get("Content-Type"),
},
},
}, nil
}
return &UploadResp{}, nil
}
// 计算所有分片的实际大小总和
func calculateTotalSize(tempDir string, totalChunks int) (int64, error) {
var totalSize int64
for i := 0; i < totalChunks; i++ {
chunkPath := filepath.Join(tempDir, fmt.Sprintf("chunk_%d", i))
info, err := os.Stat(chunkPath)
if err != nil {
return 0, err
}
totalSize += info.Size()
}
return totalSize, nil
}
func combineChunks(tempDir, finalPath string, totalChunks int) error {
finalFile, err := os.Create(finalPath)
if err != nil {
return err
}
defer finalFile.Close()
for i := 0; i < totalChunks; i++ {
chunkPath := fmt.Sprintf("%s/chunk_%d", tempDir, i)
chunk, err := os.ReadFile(chunkPath)
if err != nil {
return err
}
if _, err := finalFile.Write(chunk); err != nil {
return err
}
}
return nil
}
func calculateFileMD5(filePath string) (string, error) {
file, err := os.Open(filePath)
if err != nil {
return "", err
}
defer file.Close()
hash := md5.New()
if _, err := io.Copy(hash, file); err != nil {
return "", err
}
return hex.EncodeToString(hash.Sum(nil)), nil
}

View File

@@ -1,7 +1,10 @@
package medias
type UploadReq struct {
Files []string `json:"files"`
FileName string `form:"file_name"`
ChunkNumber int `form:"chunk_number"`
TotalChunks int `form:"total_chunks"`
FileMD5 string `form:"file_md5"`
}
type UploadResp struct {

View File

@@ -0,0 +1,56 @@
package medias
import (
"database/sql"
"git.ipao.vip/rogeecn/atom"
"git.ipao.vip/rogeecn/atom/container"
"git.ipao.vip/rogeecn/atom/contracts"
"git.ipao.vip/rogeecn/atom/utils/opt"
)
func Provide(opts ...opt.Option) error {
if err := container.Container.Provide(func(
svc *Service,
) (*Controller, error) {
obj := &Controller{
svc: svc,
}
if err := obj.Prepare(); err != nil {
return nil, err
}
return obj, nil
}); err != nil {
return err
}
if err := container.Container.Provide(func(
controller *Controller,
) (contracts.HttpRoute, error) {
obj := &Routes{
controller: controller,
}
if err := obj.Prepare(); err != nil {
return nil, err
}
return obj, nil
}, atom.GroupRoutes); err != nil {
return err
}
if err := container.Container.Provide(func(
db *sql.DB,
) (*Service, error) {
obj := &Service{
db: db,
}
if err := obj.Prepare(); err != nil {
return nil, err
}
return obj, nil
}); err != nil {
return err
}
return nil
}

View File

@@ -0,0 +1,37 @@
// Code generated by the atomctl ; DO NOT EDIT.
package medias
import (
. "backend/pkg/f"
_ "git.ipao.vip/rogeecn/atom"
_ "git.ipao.vip/rogeecn/atom/contracts"
"github.com/gofiber/fiber/v3"
log "github.com/sirupsen/logrus"
)
// @provider contracts.HttpRoute atom.GroupRoutes
type Routes struct {
log *log.Entry `inject:"false"`
controller *Controller
}
func (r *Routes) Prepare() error {
r.log = log.WithField("module", "routes.medias")
return nil
}
func (r *Routes) Name() string {
return "medias"
}
func (r *Routes) Register(router fiber.Router) {
// 注册路由组: Controller
router.Post("/api/v1/medias/upload", DataFunc2(
r.controller.Upload,
File("file"),
Body[UploadReq]("req"),
))
}

View File

@@ -3,7 +3,6 @@ package middlewares
import (
"backend/providers/app"
"backend/providers/jwt"
"backend/providers/storage"
"backend/providers/wechat"
log "github.com/sirupsen/logrus"
@@ -13,10 +12,9 @@ import (
type Middlewares struct {
log *log.Entry `inject:"false"`
app *app.Config
storagePath *storage.Config
jwt *jwt.JWT
client *wechat.Client
app *app.Config
jwt *jwt.JWT
client *wechat.Client
}
func (f *Middlewares) Prepare() error {

View File

@@ -3,7 +3,6 @@ package middlewares
import (
"backend/providers/app"
"backend/providers/jwt"
"backend/providers/storage"
"backend/providers/wechat"
"git.ipao.vip/rogeecn/atom/container"
@@ -15,13 +14,11 @@ func Provide(opts ...opt.Option) error {
app *app.Config,
client *wechat.Client,
jwt *jwt.JWT,
storagePath *storage.Config,
) (*Middlewares, error) {
obj := &Middlewares{
app: app,
client: client,
jwt: jwt,
storagePath: storagePath,
app: app,
client: client,
jwt: jwt,
}
if err := obj.Prepare(); err != nil {
return nil, err

View File

@@ -3,6 +3,13 @@ package http
import (
"backend/app/errorx"
"backend/app/events/subscribers"
"backend/app/http/auth"
"backend/app/http/medias"
"backend/app/http/orders"
"backend/app/http/posts"
"backend/app/http/storages"
"backend/app/http/tenants"
"backend/app/http/users"
"backend/app/jobs"
"backend/app/middlewares"
"backend/app/service"
@@ -14,7 +21,9 @@ import (
"backend/providers/http/swagger"
"backend/providers/job"
"backend/providers/jwt"
"backend/providers/pay"
"backend/providers/postgres"
"backend/providers/wechat"
"git.ipao.vip/rogeecn/atom"
"git.ipao.vip/rogeecn/atom/container"
@@ -46,6 +55,18 @@ func Command() atom.Option {
With(
jobs.Provide,
subscribers.Provide,
middlewares.Provide,
wechat.Provide,
pay.Provide,
).
With(
users.Provide,
tenants.Provide,
posts.Provide,
orders.Provide,
auth.Provide,
medias.Provide,
storages.Provide,
),
),
)

View File

@@ -31,10 +31,9 @@ Asset = "/projects/qvyun/frontend/dist"
[Pay]
[Pay.WeChat]
AppId = "wx45745a8c51091ae0"
MechID = ""
SubMechID = ""
SerialNo = ""
ApiV3Key = ""
PrivateKey = ""
WechatAppId = "wx45745a8c51091ae0"
WechatMechID = ""
WechatSubMechID = ""
WechatSerialNo = ""
WechatApiV3Key = ""
WechatPrivateKey = ""

View File

@@ -0,0 +1,19 @@
### Upload File
POST http://localhost:9600/api/v1/medias/upload
Content-Type: multipart/form-data; boundary=----WebKitFormBoundary7MA4YWxkTrZu0gW
------WebKitFormBoundary7MA4YWxkTrZu0gW
Content-Disposition: form-data; name="chunk_number"
1
------WebKitFormBoundary7MA4YWxkTrZu0gW
Content-Disposition: form-data; name="total_chunks"
1
------WebKitFormBoundary7MA4YWxkTrZu0gW
Content-Disposition: form-data; name="file"; filename="1.png"
Content-Type: image/png
< ./upload.png
------WebKitFormBoundary7MA4YWxkTrZu0gW--

BIN
backend/fixtures/upload.png Normal file

Binary file not shown.

After

Width:  |  Height:  |  Size: 184 KiB

View File

@@ -1,10 +1,18 @@
package f
import (
"mime/multipart"
"github.com/gofiber/fiber/v3"
"github.com/pkg/errors"
)
func File(key string) func(fiber.Ctx) (*multipart.FileHeader, error) {
return func(ctx fiber.Ctx) (*multipart.FileHeader, error) {
return ctx.FormFile(key)
}
}
func Local[T any](key string) func(fiber.Ctx) (T, error) {
return func(ctx fiber.Ctx) (T, error) {
v := fiber.Locals[T](ctx, key)

View File

@@ -17,14 +17,10 @@ func DefaultProvider() container.ProviderContainer {
}
type Config struct {
WeChat *wechatPay
}
type wechatPay struct {
AppId string
MechID string
SubMechID string
SerialNo string
ApiV3Key string
PrivateKey string
WechatAppId string
WechatMechID string
WechatSubMechID string
WechatSerialNo string
WechatApiV3Key string
WechatPrivateKey string
}

View File

@@ -16,11 +16,12 @@ func Provide(opts ...opt.Option) error {
return err
}
return container.Container.Provide(func(app *app.Config) (*Client, error) {
return nil, nil
wechatPay, err := wechat.NewClientV3(
config.WeChat.MechID,
config.WeChat.SerialNo,
config.WeChat.ApiV3Key,
config.WeChat.PrivateKey,
config.WechatMechID,
config.WechatSerialNo,
config.WechatApiV3Key,
config.WechatPrivateKey,
)
if err != nil {
return nil, err

View File

@@ -16,9 +16,9 @@ func (client *Client) WeChat_JSApiPayRequest(ctx context.Context, payerOpenID, o
bm := make(gopay.BodyMap)
bm.
Set("sp_appid", client.conf.WeChat.AppId).
Set("sp_mchid", client.conf.WeChat.MechID).
Set("sub_mchid", client.conf.WeChat.SubMechID).
Set("sp_appid", client.conf.WechatAppId).
Set("sp_mchid", client.conf.WechatMechID).
Set("sub_mchid", client.conf.WechatSubMechID).
Set("description", title).
Set("out_trade_no", orderNo).
Set("time_expire", expire).
@@ -45,5 +45,5 @@ func (client *Client) WeChat_JSApiPayRequest(ctx context.Context, payerOpenID, o
return nil, errors.New("获取预支付ID失败")
}
return client.WeChat.PaySignOfJSAPI(client.conf.WeChat.AppId, resp.Response.PrepayId)
return client.WeChat.PaySignOfJSAPI(client.conf.WechatAppId, resp.Response.PrepayId)
}

View File

@@ -0,0 +1,140 @@
<template>
<div>
<FileUpload :customUpload="true" @uploader="handleUpload" multiple :maxFileSize="50000000">
<template #empty>
<p>拖拽文件到此处上传</p>
</template>
</FileUpload>
<ProgressBar v-if="progress > 0" :value="progress" />
<small v-if="progress > 0">{{ status }}</small>
</div>
</template>
<script setup>
import SparkMD5 from 'spark-md5';
import { ref } from 'vue';
const CHUNK_SIZE = 2 * 1024 * 1024; // 2MB chunks
const progress = ref(0);
const status = ref('');
const createChunks = (file) => {
const chunks = [];
let start = 0;
while (start < file.size) {
chunks.push(file.slice(start, start + CHUNK_SIZE));
start += CHUNK_SIZE;
}
return chunks;
};
const calculateFileMD5 = async (file) => {
status.value = '计算文件MD5...';
progress.value = 0;
const chunkSize = 2097152; // 2MB
const chunks = Math.ceil(file.size / chunkSize);
const spark = new SparkMD5.ArrayBuffer();
const fileReader = new FileReader();
let currentChunk = 0;
const readNextChunk = () => {
const start = currentChunk * chunkSize;
const end = Math.min(file.size, start + chunkSize);
fileReader.readAsArrayBuffer(file.slice(start, end));
};
return new Promise((resolve, reject) => {
fileReader.onload = (e) => {
spark.append(e.target.result);
currentChunk++;
progress.value = Math.round((currentChunk / chunks) * 100);
if (currentChunk < chunks) {
readNextChunk();
} else {
const md5 = spark.end();
progress.value = 0;
resolve(md5);
}
};
fileReader.onerror = () => {
reject('MD5计算失败');
};
readNextChunk();
});
};
const MAX_CONCURRENT_UPLOADS = 3;
const uploadChunks = async (chunks, file, fileMD5) => {
const pending = [...Array(chunks.length).keys()];
const uploading = new Set();
const completed = new Set();
while (pending.length > 0 || uploading.size > 0) {
while (uploading.size < MAX_CONCURRENT_UPLOADS && pending.length > 0) {
const index = pending.shift();
uploading.add(index);
const formData = new FormData();
formData.append('file', chunks[index]);
formData.append('fileName', file.name);
formData.append('chunkNumber', index);
formData.append('totalChunks', chunks.length);
formData.append('fileMD5', fileMD5);
uploadChunk(formData, index)
.then(() => {
uploading.delete(index);
completed.add(index);
progress.value = Math.round((completed.size / chunks.length) * 100);
})
.catch((error) => {
uploading.delete(index);
pending.push(index);
console.error(`Chunk ${index} failed:`, error);
});
}
await new Promise(resolve => setTimeout(resolve, 100));
}
};
const uploadChunk = async (formData, index) => {
const response = await fetch('/api/v1/medias/upload', {
method: 'POST',
body: formData
});
if (!response.ok) {
throw new Error(`HTTP error! status: ${response.status}`);
}
return response.json();
};
const handleUpload = async (event) => {
const files = event.files;
for (const file of files) {
try {
const fileMD5 = await calculateFileMD5(file);
status.value = '开始上传文件...';
const chunks = createChunks(file);
await uploadChunks(chunks, file, fileMD5);
status.value = '上传完成';
setTimeout(() => {
progress.value = 0;
status.value = '';
}, 2000);
} catch (error) {
console.error('Upload failed:', error);
status.value = '上传失败: ' + error;
}
}
};
</script>

10
qvyun.code-workspace Normal file
View File

@@ -0,0 +1,10 @@
{
"folders": [
{
"path": "frontend"
},
{
"path": "backend"
}
]
}