feat: 添加媒体资产上传初始化和完成相关API接口及数据结构

This commit is contained in:
2025-12-22 17:02:53 +08:00
parent 6ab65817d8
commit 76f639b3f3
12 changed files with 959 additions and 19 deletions

View File

@@ -0,0 +1,228 @@
package services
import (
"context"
"crypto/rand"
"encoding/base32"
"encoding/json"
"errors"
"strconv"
"strings"
"time"
"quyun/v2/app/errorx"
"quyun/v2/app/http/tenant/dto"
"quyun/v2/database/models"
"quyun/v2/pkg/consts"
pkgerrors "github.com/pkg/errors"
"github.com/sirupsen/logrus"
"go.ipao.vip/gen/types"
"gorm.io/gorm"
"gorm.io/gorm/clause"
)
// mediaAsset 提供媒体资源上传初始化等能力(上传/处理链路会在后续里程碑补齐)。
//
// @provider
type mediaAsset struct{}
func newObjectKey(tenantID, userID int64, assetType consts.MediaAssetType, now time.Time) (string, error) {
// object_key 作为存储定位的关键字段:必须由服务端生成,避免客户端路径注入与越权覆盖。
buf := make([]byte, 16) // 128-bit
if _, err := rand.Read(buf); err != nil {
return "", err
}
token := strings.ToLower(base32.StdEncoding.WithPadding(base32.NoPadding).EncodeToString(buf))
date := now.UTC().Format("20060102")
return "tenants/" + strconv.FormatInt(tenantID, 10) +
"/users/" + strconv.FormatInt(userID, 10) +
"/" + string(assetType) +
"/" + date +
"/" + token, nil
}
// AdminUploadInit creates a MediaAsset record and returns upload parameters.
// 当前版本为“stub 上传初始化”:只负责生成 asset 与 object_key不对接外部存储签名。
func (s *mediaAsset) AdminUploadInit(ctx context.Context, tenantID, operatorUserID int64, form *dto.AdminMediaAssetUploadInitForm, now time.Time) (*dto.AdminMediaAssetUploadInitResponse, error) {
if tenantID <= 0 || operatorUserID <= 0 {
return nil, errorx.ErrInvalidParameter.WithMsg("tenant_id/operator_user_id must be > 0")
}
if form == nil {
return nil, errorx.ErrInvalidParameter.WithMsg("form is nil")
}
if now.IsZero() {
now = time.Now()
}
typ := consts.MediaAssetType(strings.TrimSpace(form.Type))
if typ == "" || !typ.IsValid() {
return nil, errorx.ErrInvalidParameter.WithMsg("invalid type")
}
objectKey, err := newObjectKey(tenantID, operatorUserID, typ, now)
if err != nil {
return nil, pkgerrors.Wrap(err, "generate object_key failed")
}
metaMap := map[string]any{}
if form.ContentType != "" {
metaMap["content_type"] = strings.TrimSpace(form.ContentType)
}
if form.FileSize > 0 {
metaMap["file_size"] = form.FileSize
}
if form.SHA256 != "" {
metaMap["sha256"] = strings.ToLower(strings.TrimSpace(form.SHA256))
}
metaBytes, _ := json.Marshal(metaMap)
if len(metaBytes) == 0 {
metaBytes = []byte("{}")
}
m := &models.MediaAsset{
TenantID: tenantID,
UserID: operatorUserID,
Type: typ,
Status: consts.MediaAssetStatusUploaded,
Provider: "stub",
Bucket: "",
ObjectKey: objectKey,
Meta: types.JSON(metaBytes),
CreatedAt: now,
UpdatedAt: now,
}
if err := m.Create(ctx); err != nil {
return nil, pkgerrors.Wrap(err, "create media asset failed")
}
logrus.WithFields(logrus.Fields{
"tenant_id": tenantID,
"user_id": operatorUserID,
"asset_id": m.ID,
"type": typ,
"object_key": objectKey,
}).Info("services.media_asset.admin.upload_init")
// 约定upload_url 先返回空或内部占位;后续接入真实存储签名后再补齐。
return &dto.AdminMediaAssetUploadInitResponse{
AssetID: m.ID,
Provider: m.Provider,
Bucket: m.Bucket,
ObjectKey: m.ObjectKey,
UploadURL: "",
Headers: map[string]string{},
FormFields: map[string]string{},
ExpiresAt: nil,
}, nil
}
// AdminUploadComplete marks the asset upload as completed and transitions status uploaded -> processing.
// 幂等语义:
// - 若当前已是 processing/ready/failed则直接返回当前资源不重复触发处理。
// - 仅允许 uploaded 状态进入 processing其他状态返回状态冲突/前置条件失败。
func (s *mediaAsset) AdminUploadComplete(
ctx context.Context,
tenantID, operatorUserID, assetID int64,
form *dto.AdminMediaAssetUploadCompleteForm,
now time.Time,
) (*models.MediaAsset, error) {
if tenantID <= 0 || operatorUserID <= 0 || assetID <= 0 {
return nil, errorx.ErrInvalidParameter.WithMsg("tenant_id/operator_user_id/asset_id must be > 0")
}
if now.IsZero() {
now = time.Now()
}
logrus.WithFields(logrus.Fields{
"tenant_id": tenantID,
"user_id": operatorUserID,
"asset_id": assetID,
}).Info("services.media_asset.admin.upload_complete")
var out models.MediaAsset
err := _db.WithContext(ctx).Transaction(func(tx *gorm.DB) error {
var m models.MediaAsset
if err := tx.
Clauses(clause.Locking{Strength: "UPDATE"}).
Where("tenant_id = ? AND id = ?", tenantID, assetID).
First(&m).Error; err != nil {
if errors.Is(err, gorm.ErrRecordNotFound) {
return errorx.ErrRecordNotFound.WithMsg("media asset not found")
}
return err
}
// 软删除资源不允许进入处理流程。
if m.DeletedAt.Valid {
return errorx.ErrPreconditionFailed.WithMsg("media asset deleted")
}
// 幂等:重复 upload_complete 时返回现态。
switch m.Status {
case consts.MediaAssetStatusProcessing, consts.MediaAssetStatusReady, consts.MediaAssetStatusFailed:
out = m
return nil
case consts.MediaAssetStatusUploaded:
// allowed
default:
return errorx.ErrStatusConflict.WithMsg("invalid media asset status")
}
// 合并 meta尽量不覆盖已有字段
meta := map[string]any{}
if len(m.Meta) > 0 {
_ = json.Unmarshal(m.Meta, &meta)
}
meta["upload_complete_at"] = now.UTC().Format(time.RFC3339Nano)
if form != nil {
if strings.TrimSpace(form.ETag) != "" {
meta["etag"] = strings.TrimSpace(form.ETag)
}
if strings.TrimSpace(form.ContentType) != "" {
meta["content_type"] = strings.TrimSpace(form.ContentType)
}
if form.FileSize > 0 {
meta["file_size"] = form.FileSize
}
if strings.TrimSpace(form.SHA256) != "" {
meta["sha256"] = strings.ToLower(strings.TrimSpace(form.SHA256))
}
}
metaBytes, _ := json.Marshal(meta)
if len(metaBytes) == 0 {
metaBytes = []byte("{}")
}
// 状态迁移uploaded -> processing
if err := tx.Model(&models.MediaAsset{}).
Where("id = ?", m.ID).
Updates(map[string]any{
"status": consts.MediaAssetStatusProcessing,
"meta": types.JSON(metaBytes),
"updated_at": now,
}).Error; err != nil {
return err
}
m.Status = consts.MediaAssetStatusProcessing
m.Meta = types.JSON(metaBytes)
m.UpdatedAt = now
out = m
// 触发异步处理(当前为 stub后续接入队列/任务系统时在此处落任务并保持幂等。
logrus.WithFields(logrus.Fields{
"tenant_id": tenantID,
"user_id": operatorUserID,
"asset_id": assetID,
"status": m.Status,
}).Info("services.media_asset.process.triggered")
return nil
})
if err != nil {
return nil, err
}
return &out, nil
}

View File

@@ -27,6 +27,13 @@ func Provide(opts ...opt.Option) error {
}); err != nil {
return err
}
if err := container.Container.Provide(func() (*mediaAsset, error) {
obj := &mediaAsset{}
return obj, nil
}); err != nil {
return err
}
if err := container.Container.Provide(func(
db *gorm.DB,
ledger *ledger,
@@ -44,19 +51,23 @@ func Provide(opts ...opt.Option) error {
content *content,
db *gorm.DB,
ledger *ledger,
mediaAsset *mediaAsset,
order *order,
tenant *tenant,
tenantJoin *tenantJoin,
test *test,
user *user,
) (contracts.Initial, error) {
obj := &services{
content: content,
db: db,
ledger: ledger,
order: order,
tenant: tenant,
test: test,
user: user,
content: content,
db: db,
ledger: ledger,
mediaAsset: mediaAsset,
order: order,
tenant: tenant,
tenantJoin: tenantJoin,
test: test,
user: user,
}
if err := obj.Prepare(); err != nil {
return nil, err
@@ -73,6 +84,13 @@ func Provide(opts ...opt.Option) error {
}); err != nil {
return err
}
if err := container.Container.Provide(func() (*tenantJoin, error) {
obj := &tenantJoin{}
return obj, nil
}); err != nil {
return err
}
if err := container.Container.Provide(func() (*test, error) {
obj := &test{}

View File

@@ -8,24 +8,28 @@ var _db *gorm.DB
// exported CamelCase Services
var (
Content *content
Ledger *ledger
Order *order
Tenant *tenant
Test *test
User *user
Content *content
Ledger *ledger
MediaAsset *mediaAsset
Order *order
Tenant *tenant
TenantJoin *tenantJoin
Test *test
User *user
)
// @provider(model)
type services struct {
db *gorm.DB
// define Services
content *content
ledger *ledger
order *order
tenant *tenant
test *test
user *user
content *content
ledger *ledger
mediaAsset *mediaAsset
order *order
tenant *tenant
tenantJoin *tenantJoin
test *test
user *user
}
func (svc *services) Prepare() error {
@@ -34,8 +38,10 @@ func (svc *services) Prepare() error {
// set exported Services here
Content = svc.content
Ledger = svc.ledger
MediaAsset = svc.mediaAsset
Order = svc.order
Tenant = svc.tenant
TenantJoin = svc.tenantJoin
Test = svc.test
User = svc.user

View File

@@ -23,6 +23,12 @@ import (
"gorm.io/gorm/clause"
)
// tenantJoin 提供“加入租户”域相关能力(占位服务)。
// 当前 join 相关实现复用在 `tenant` service 上,以保持对外 API 不变;此处仅用于服务汇总/注入。
//
// @provider
type tenantJoin struct{}
func isUniqueViolation(err error) bool {
var pgErr *pgconn.PgError
if errors.As(err, &pgErr) {