Files
quyun-v2/backend/app/services/common.go

631 lines
17 KiB
Go
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
package services
import (
"context"
"crypto/md5"
"encoding/hex"
"encoding/json"
"errors"
"io"
"mime/multipart"
"os"
"path"
"path/filepath"
"sort"
"strconv"
"strings"
"time"
"quyun/v2/app/errorx"
common_dto "quyun/v2/app/http/v1/dto"
"quyun/v2/app/requests"
"quyun/v2/database/fields"
"quyun/v2/database/models"
"quyun/v2/pkg/consts"
"quyun/v2/providers/storage"
"github.com/google/uuid"
"github.com/jackc/pgconn"
"go.ipao.vip/gen/types"
"gorm.io/gorm"
)
// @provider
type common struct {
storage *storage.Storage
}
func (s *common) Options(ctx context.Context) (*common_dto.OptionsResponse, error) {
return &common_dto.OptionsResponse{
ContentStatus: consts.ContentStatusItems(),
ContentGenre: []requests.KV{
requests.NewKV("Jingju", "京剧"),
requests.NewKV("Kunqu", "昆曲"),
requests.NewKV("Yueju", "越剧"),
requests.NewKV("Yuju", "豫剧"),
requests.NewKV("Huangmeixi", "黄梅戏"),
requests.NewKV("Pingju", "评剧"),
requests.NewKV("Qinqiang", "秦腔"),
},
}, nil
}
func (s *common) CheckHash(ctx context.Context, tenantID, userID int64, hash string) (*common_dto.UploadResult, error) {
// 优先使用路径租户,避免跨租户写入。
tenant, err := s.resolveTenant(ctx, tenantID, userID)
if err != nil {
return nil, err
}
var tid int64
if tenant != nil {
tid = tenant.ID
}
query := models.MediaAssetQuery.WithContext(ctx).Where(models.MediaAssetQuery.Hash.Eq(hash))
if tid > 0 {
query = query.Where(models.MediaAssetQuery.TenantID.Eq(tid))
}
existing, err := query.First()
if err != nil {
return nil, nil // Not found, proceed to upload
}
// Found existing file (deduplication hit)
// Check if user already has it (Logic deduplication hit)
myQuery := models.MediaAssetQuery.WithContext(ctx).
Where(models.MediaAssetQuery.Hash.Eq(hash), models.MediaAssetQuery.UserID.Eq(userID))
if tid > 0 {
myQuery = myQuery.Where(models.MediaAssetQuery.TenantID.Eq(tid))
}
myExisting, err := myQuery.First()
if err == nil {
return s.composeUploadResult(myExisting), nil
}
// Create new record for this user reusing existing ObjectKey
asset := &models.MediaAsset{
TenantID: tid,
UserID: userID,
Type: existing.Type,
Status: consts.MediaAssetStatusUploaded,
Provider: existing.Provider,
Bucket: existing.Bucket,
ObjectKey: existing.ObjectKey,
Hash: hash,
Meta: existing.Meta,
}
if err := models.MediaAssetQuery.WithContext(ctx).Create(asset); err != nil {
return nil, errorx.ErrDatabaseError.WithCause(err)
}
return s.composeUploadResult(asset), nil
}
type UploadMeta struct {
// Filename 原始文件名,用于生成对象路径。
Filename string `json:"filename"`
// Type 业务媒体类型video/audio/image 等)。
Type string `json:"type"`
// MimeType 上传文件的 MIME 类型。
MimeType string `json:"mime_type"`
// TenantID 上传所属租户ID用于归属校验。
TenantID int64 `json:"tenant_id"`
// UserID 上传发起用户ID用于归属校验。
UserID int64 `json:"user_id"`
}
func (s *common) buildObjectKey(tenant *models.Tenant, hash, filename string) string {
// 按租户维度组织对象路径quyun/<tenant_uuid>/<hash>.<ext>
tenantUUID := "public"
if tenant != nil && tenant.UUID.String() != "" {
tenantUUID = tenant.UUID.String()
}
ext := strings.ToLower(filepath.Ext(filename))
return path.Join("quyun", tenantUUID, hash+ext)
}
func (s *common) loadUploadMeta(tempDir string) (*UploadMeta, error) {
metaFile, err := os.Open(filepath.Join(tempDir, "meta.json"))
if err != nil {
if errors.Is(err, os.ErrNotExist) {
return nil, errorx.ErrRecordNotFound.WithCause(err).WithMsg("上传会话不存在")
}
return nil, errorx.ErrInternalError.WithCause(err)
}
defer metaFile.Close()
var meta UploadMeta
if err := json.NewDecoder(metaFile).Decode(&meta); err != nil {
return nil, errorx.ErrDataCorrupted.WithCause(err).WithMsg("上传会话元信息损坏")
}
return &meta, nil
}
func (s *common) verifyUploadOwner(meta *UploadMeta, tenantID, userID int64) error {
// 校验上传会话归属,避免同租户猜测 upload_id 进行越权操作。
if meta.TenantID != tenantID || meta.UserID != userID {
return errorx.ErrForbidden.WithMsg("无权访问该上传会话")
}
return nil
}
func (s *common) resolveTenant(ctx context.Context, tenantID, userID int64) (*models.Tenant, error) {
if tenantID > 0 {
tbl, q := models.TenantQuery.QueryContext(ctx)
tenant, err := q.Where(tbl.ID.Eq(tenantID)).First()
if err != nil {
if errors.Is(err, gorm.ErrRecordNotFound) {
return nil, errorx.ErrRecordNotFound.WithMsg("租户不存在")
}
return nil, errorx.ErrDatabaseError.WithCause(err)
}
return tenant, nil
}
if userID == 0 {
return nil, nil
}
tbl, q := models.TenantQuery.QueryContext(ctx)
tenant, err := q.Where(tbl.UserID.Eq(userID)).First()
if err != nil {
if errors.Is(err, gorm.ErrRecordNotFound) {
return nil, nil
}
return nil, errorx.ErrDatabaseError.WithCause(err)
}
return tenant, nil
}
func (s *common) uploadTempDir(localPath string, tenantID int64, uploadID string) string {
tenantKey := "public"
if tenantID > 0 {
tenantKey = strconv.FormatInt(tenantID, 10)
}
return filepath.Join(localPath, "temp", tenantKey, uploadID)
}
func (s *common) InitUpload(ctx context.Context, tenantID, userID int64, form *common_dto.UploadInitForm) (*common_dto.UploadInitResponse, error) {
uploadID := uuid.NewString()
localPath := s.storage.Config.LocalPath
if localPath == "" {
localPath = "./storage"
}
tempDir := s.uploadTempDir(localPath, tenantID, uploadID)
if err := os.MkdirAll(tempDir, 0o755); err != nil {
return nil, errorx.ErrInternalError.WithCause(err)
}
// Save metadata
meta := UploadMeta{
Filename: form.Filename,
Type: form.Type, // Ensure form has Type
MimeType: form.MimeType,
TenantID: tenantID,
UserID: userID,
}
metaFile, _ := os.Create(filepath.Join(tempDir, "meta.json"))
json.NewEncoder(metaFile).Encode(meta)
metaFile.Close()
return &common_dto.UploadInitResponse{
UploadID: uploadID,
ChunkSize: 5 * 1024 * 1024,
}, nil
}
func (s *common) UploadPart(ctx context.Context, tenantID, userID int64, file *multipart.FileHeader, form *common_dto.UploadPartForm) error {
localPath := s.storage.Config.LocalPath
if localPath == "" {
localPath = "./storage"
}
tempDir := s.uploadTempDir(localPath, tenantID, form.UploadID)
meta, err := s.loadUploadMeta(tempDir)
if err != nil {
return err
}
if err := s.verifyUploadOwner(meta, tenantID, userID); err != nil {
return err
}
partPath := filepath.Join(tempDir, strconv.Itoa(form.PartNumber))
src, err := file.Open()
if err != nil {
return errorx.ErrInternalError.WithCause(err)
}
defer src.Close()
dst, err := os.Create(partPath)
if err != nil {
return errorx.ErrInternalError.WithCause(err)
}
defer dst.Close()
if _, err = io.Copy(dst, src); err != nil {
return errorx.ErrInternalError.WithCause(err)
}
return nil
}
func (s *common) CompleteUpload(ctx context.Context, tenantID, userID int64, form *common_dto.UploadCompleteForm) (*common_dto.UploadResult, error) {
localPath := s.storage.Config.LocalPath
if localPath == "" {
localPath = "./storage"
}
tempDir := s.uploadTempDir(localPath, tenantID, form.UploadID)
// 校验上传会话归属,避免同租户猜测 upload_id 进行越权操作。
meta, err := s.loadUploadMeta(tempDir)
if err != nil {
return nil, err
}
if err := s.verifyUploadOwner(meta, tenantID, userID); err != nil {
return nil, err
}
// List parts
entries, err := os.ReadDir(tempDir)
if err != nil {
return nil, errorx.ErrInternalError.WithCause(err)
}
var parts []int
for _, e := range entries {
if !e.IsDir() && e.Name() != "meta.json" {
if i, err := strconv.Atoi(e.Name()); err == nil {
parts = append(parts, i)
}
}
}
sort.Ints(parts)
mergedPath := filepath.Join(tempDir, "merged")
dst, err := os.Create(mergedPath)
if err != nil {
return nil, errorx.ErrInternalError.WithCause(err)
}
defer dst.Close()
hasher := md5.New()
var totalSize int64
for _, partNum := range parts {
partPath := filepath.Join(tempDir, strconv.Itoa(partNum))
src, err := os.Open(partPath)
if err != nil {
return nil, errorx.ErrInternalError.WithCause(err)
}
n, err := io.Copy(io.MultiWriter(dst, hasher), src)
src.Close()
if err != nil {
return nil, errorx.ErrInternalError.WithCause(err)
}
totalSize += n
}
hash := hex.EncodeToString(hasher.Sum(nil))
dst.Close() // Ensure flush before potential removal
// Deduplication Logic (Similar to Upload)
tenant, err := s.resolveTenant(ctx, tenantID, userID)
if err != nil {
return nil, err
}
var tid int64
if tenant != nil {
tid = tenant.ID
}
objectKey := s.buildObjectKey(tenant, hash, meta.Filename)
existingQuery := models.MediaAssetQuery.WithContext(ctx).Where(models.MediaAssetQuery.Hash.Eq(hash))
if tid > 0 {
existingQuery = existingQuery.Where(models.MediaAssetQuery.TenantID.Eq(tid))
}
existing, err := existingQuery.First()
var asset *models.MediaAsset
if err == nil {
os.Remove(mergedPath) // Delete duplicate
myQuery := models.MediaAssetQuery.WithContext(ctx).
Where(models.MediaAssetQuery.Hash.Eq(hash), models.MediaAssetQuery.UserID.Eq(userID))
if tid > 0 {
myQuery = myQuery.Where(models.MediaAssetQuery.TenantID.Eq(tid))
}
myExisting, err := myQuery.First()
if err == nil {
os.RemoveAll(tempDir)
return s.composeUploadResult(myExisting), nil
}
asset = &models.MediaAsset{
TenantID: tid,
UserID: userID,
Type: consts.MediaAssetType(meta.Type),
Status: consts.MediaAssetStatusUploaded,
Provider: existing.Provider,
Bucket: existing.Bucket,
ObjectKey: existing.ObjectKey,
Hash: hash,
Meta: existing.Meta,
}
} else {
finalPath := filepath.Join(localPath, objectKey)
if err := os.MkdirAll(filepath.Dir(finalPath), 0o755); err != nil {
return nil, errorx.ErrInternalError.WithCause(err)
}
if err := os.Rename(mergedPath, finalPath); err != nil {
return nil, errorx.ErrInternalError.WithCause(err)
}
asset = &models.MediaAsset{
TenantID: tid,
UserID: userID,
Type: consts.MediaAssetType(meta.Type),
Status: consts.MediaAssetStatusUploaded,
Provider: "local",
Bucket: "default",
ObjectKey: objectKey,
Hash: hash,
Meta: types.NewJSONType(fields.MediaAssetMeta{
Filename: meta.Filename,
Size: totalSize,
}),
}
}
os.RemoveAll(tempDir)
if err := models.MediaAssetQuery.WithContext(ctx).Create(asset); err != nil {
return nil, errorx.ErrDatabaseError.WithCause(err)
}
return s.composeUploadResult(asset), nil
}
func (s *common) DeleteMediaAsset(ctx context.Context, tenantID, userID, id int64) error {
query := models.MediaAssetQuery.WithContext(ctx).
Where(models.MediaAssetQuery.ID.Eq(id), models.MediaAssetQuery.UserID.Eq(userID))
if tenantID > 0 {
query = query.Where(models.MediaAssetQuery.TenantID.Eq(tenantID))
}
asset, err := query.First()
if err != nil {
return errorx.ErrRecordNotFound
}
// Delete DB record
if _, err := models.MediaAssetQuery.WithContext(ctx).Where(models.MediaAssetQuery.ID.Eq(id)).Delete(); err != nil {
return errorx.ErrDatabaseError.WithCause(err)
}
// Check ref count
count, _ := models.MediaAssetQuery.WithContext(ctx).
Where(models.MediaAssetQuery.ObjectKey.Eq(asset.ObjectKey)).
Count()
if count == 0 {
// Physical delete
_ = s.storage.Delete(asset.ObjectKey)
}
return nil
}
func (s *common) AbortUpload(ctx context.Context, tenantID, userID int64, uploadId string) error {
localPath := s.storage.Config.LocalPath
if localPath == "" {
localPath = "./storage"
}
tempDir := s.uploadTempDir(localPath, tenantID, uploadId)
meta, err := s.loadUploadMeta(tempDir)
if err != nil {
return err
}
if err := s.verifyUploadOwner(meta, tenantID, userID); err != nil {
return err
}
return os.RemoveAll(tempDir)
}
func (s *common) Upload(
ctx context.Context,
tenantID int64,
userID int64,
file *multipart.FileHeader,
typeArg string,
) (*common_dto.UploadResult, error) {
// But this Upload endpoint accepts file. So we save it.
// Save file content to local storage
src, err := file.Open()
if err != nil {
return nil, errorx.ErrInternalError.WithCause(err).WithMsg("failed to open uploaded file")
}
defer src.Close()
localPath := s.storage.Config.LocalPath
if localPath == "" {
localPath = "./storage" // Fallback
}
tmpDir := filepath.Join(localPath, "temp", "uploads", uuid.NewString())
if err := os.MkdirAll(tmpDir, 0o755); err != nil {
return nil, errorx.ErrInternalError.WithCause(err).WithMsg("failed to create storage directory")
}
tmpPath := filepath.Join(tmpDir, "file")
dst, err := os.Create(tmpPath)
if err != nil {
return nil, errorx.ErrInternalError.WithCause(err).WithMsg("failed to create destination file")
}
// Hash calculation while copying
hasher := md5.New()
size, err := io.Copy(io.MultiWriter(dst, hasher), src)
dst.Close() // Close immediately to allow removal if needed
if err != nil {
return nil, errorx.ErrInternalError.WithCause(err).WithMsg("failed to save file content")
}
hash := hex.EncodeToString(hasher.Sum(nil))
tenant, err := s.resolveTenant(ctx, tenantID, userID)
if err != nil {
return nil, err
}
var tid int64
if tenant != nil {
tid = tenant.ID
}
objectKey := s.buildObjectKey(tenant, hash, file.Filename)
var asset *models.MediaAsset
// Deduplication Check
existingQuery := models.MediaAssetQuery.WithContext(ctx).Where(models.MediaAssetQuery.Hash.Eq(hash))
if tid > 0 {
existingQuery = existingQuery.Where(models.MediaAssetQuery.TenantID.Eq(tid))
}
existing, err := existingQuery.First()
if err == nil {
// Found existing file (Storage Deduplication)
os.Remove(tmpPath) // Delete the duplicate we just wrote
os.RemoveAll(tmpDir)
// Check if user already has it (Logic Deduplication)
myQuery := models.MediaAssetQuery.WithContext(ctx).
Where(models.MediaAssetQuery.Hash.Eq(hash), models.MediaAssetQuery.UserID.Eq(userID))
if tid > 0 {
myQuery = myQuery.Where(models.MediaAssetQuery.TenantID.Eq(tid))
}
myExisting, err := myQuery.First()
if err == nil {
return s.composeUploadResult(myExisting), nil
}
// Create new link for user reusing existing ObjectKey
asset = &models.MediaAsset{
TenantID: tid,
UserID: userID,
Type: consts.MediaAssetType(typeArg),
Status: consts.MediaAssetStatusUploaded,
Provider: existing.Provider,
Bucket: existing.Bucket,
ObjectKey: existing.ObjectKey, // Reuse key
Hash: hash,
Meta: existing.Meta,
}
} else {
dstPath := filepath.Join(localPath, objectKey)
if err := os.MkdirAll(filepath.Dir(dstPath), 0o755); err != nil {
return nil, errorx.ErrInternalError.WithCause(err).WithMsg("failed to create storage directory")
}
if err := os.Rename(tmpPath, dstPath); err != nil {
return nil, errorx.ErrInternalError.WithCause(err).WithMsg("failed to finalize file")
}
os.RemoveAll(tmpDir)
// New unique file
asset = &models.MediaAsset{
TenantID: tid,
UserID: userID,
Type: consts.MediaAssetType(typeArg),
Status: consts.MediaAssetStatusUploaded,
Provider: "local",
Bucket: "default",
ObjectKey: objectKey,
Hash: hash,
Meta: types.NewJSONType(fields.MediaAssetMeta{
Filename: file.Filename,
Size: size,
}),
}
}
if err := models.MediaAssetQuery.WithContext(ctx).Create(asset); err != nil {
return nil, errorx.ErrDatabaseError.WithCause(err)
}
return s.composeUploadResult(asset), nil
}
func (s *common) composeUploadResult(asset *models.MediaAsset) *common_dto.UploadResult {
url := s.GetAssetURL(asset.ObjectKey)
filename := asset.Meta.Data().Filename
if filename == "" {
filename = filepath.Base(asset.ObjectKey)
// Try to strip UUID prefix (36 chars + 1 underscore = 37)
if len(filename) > 37 && filename[36] == '_' {
filename = filename[37:]
}
}
size := asset.Meta.Data().Size
return &common_dto.UploadResult{
ID: asset.ID,
URL: url,
Filename: filename,
Size: size,
MimeType: "application/octet-stream", // TODO: Store mime type in DB
}
}
func (s *common) GetAssetURL(objectKey string) string {
if objectKey == "" {
return ""
}
url, _ := s.storage.SignURL("GET", objectKey, 1*time.Hour)
return url
}
func retryCriticalWrite(ctx context.Context, fn func() error) error {
backoffs := []time.Duration{
50 * time.Millisecond,
120 * time.Millisecond,
250 * time.Millisecond,
}
var lastErr error
for attempt := 0; attempt <= len(backoffs); attempt++ {
if err := fn(); err != nil {
if !shouldRetryWrite(err) {
return err
}
lastErr = err
if attempt >= len(backoffs) {
return err
}
// 事务冲突/死锁等短暂错误,等待后重试。
if ctx != nil {
select {
case <-ctx.Done():
return errorx.ErrServiceTimeout.WithCause(ctx.Err())
case <-time.After(backoffs[attempt]):
}
} else {
time.Sleep(backoffs[attempt])
}
continue
}
return nil
}
return lastErr
}
func shouldRetryWrite(err error) bool {
var appErr *errorx.AppError
if errors.As(err, &appErr) {
return false
}
return isTransientDBError(err)
}
func isTransientDBError(err error) bool {
var pgErr *pgconn.PgError
if errors.As(err, &pgErr) {
switch pgErr.Code {
case "40001", "40P01":
return true
}
}
return false
}