572 lines
16 KiB
Go
572 lines
16 KiB
Go
package services
|
||
|
||
import (
|
||
"context"
|
||
"crypto/rand"
|
||
"encoding/base32"
|
||
"encoding/json"
|
||
"errors"
|
||
"strconv"
|
||
"strings"
|
||
"time"
|
||
|
||
"quyun/v2/app/errorx"
|
||
tenant_dto "quyun/v2/app/http/tenant/dto"
|
||
"quyun/v2/app/requests"
|
||
"quyun/v2/database/models"
|
||
"quyun/v2/pkg/consts"
|
||
|
||
pkgerrors "github.com/pkg/errors"
|
||
"github.com/samber/lo"
|
||
"github.com/sirupsen/logrus"
|
||
"go.ipao.vip/gen"
|
||
"go.ipao.vip/gen/field"
|
||
"go.ipao.vip/gen/types"
|
||
"gorm.io/gorm"
|
||
"gorm.io/gorm/clause"
|
||
)
|
||
|
||
// mediaAsset 提供媒体资源上传初始化等能力(上传/处理链路会在后续里程碑补齐)。
|
||
//
|
||
// @provider
|
||
type mediaAsset struct{}
|
||
|
||
func mediaAssetTransitionAllowed(from, to consts.MediaAssetStatus) bool {
|
||
switch from {
|
||
case consts.MediaAssetStatusUploaded:
|
||
return to == consts.MediaAssetStatusProcessing
|
||
case consts.MediaAssetStatusProcessing:
|
||
return to == consts.MediaAssetStatusReady || to == consts.MediaAssetStatusFailed
|
||
case consts.MediaAssetStatusReady, consts.MediaAssetStatusFailed:
|
||
return to == consts.MediaAssetStatusDeleted
|
||
default:
|
||
return false
|
||
}
|
||
}
|
||
|
||
func newObjectKey(tenantID, userID int64, assetType consts.MediaAssetType, now time.Time) (string, error) {
|
||
// object_key 作为存储定位的关键字段:必须由服务端生成,避免客户端路径注入与越权覆盖。
|
||
buf := make([]byte, 16) // 128-bit
|
||
if _, err := rand.Read(buf); err != nil {
|
||
return "", err
|
||
}
|
||
token := strings.ToLower(base32.StdEncoding.WithPadding(base32.NoPadding).EncodeToString(buf))
|
||
date := now.UTC().Format("20060102")
|
||
return "tenants/" + strconv.FormatInt(tenantID, 10) +
|
||
"/users/" + strconv.FormatInt(userID, 10) +
|
||
"/" + string(assetType) +
|
||
"/" + date +
|
||
"/" + token, nil
|
||
}
|
||
|
||
// AdminUploadInit creates a MediaAsset record and returns upload parameters.
|
||
// 当前版本为“stub 上传初始化”:只负责生成 asset 与 object_key,不对接外部存储签名。
|
||
func (s *mediaAsset) AdminUploadInit(ctx context.Context, tenantID, operatorUserID int64, form *tenant_dto.AdminMediaAssetUploadInitForm, now time.Time) (*tenant_dto.AdminMediaAssetUploadInitResponse, error) {
|
||
if tenantID <= 0 || operatorUserID <= 0 {
|
||
return nil, errorx.ErrInvalidParameter.WithMsg("tenant_id/operator_user_id must be > 0")
|
||
}
|
||
if form == nil {
|
||
return nil, errorx.ErrInvalidParameter.WithMsg("form is nil")
|
||
}
|
||
if now.IsZero() {
|
||
now = time.Now()
|
||
}
|
||
|
||
typ := consts.MediaAssetType(strings.TrimSpace(form.Type))
|
||
if typ == "" || !typ.IsValid() {
|
||
return nil, errorx.ErrInvalidParameter.WithMsg("invalid type")
|
||
}
|
||
|
||
objectKey, err := newObjectKey(tenantID, operatorUserID, typ, now)
|
||
if err != nil {
|
||
return nil, pkgerrors.Wrap(err, "generate object_key failed")
|
||
}
|
||
|
||
metaMap := map[string]any{}
|
||
if form.ContentType != "" {
|
||
metaMap["content_type"] = strings.TrimSpace(form.ContentType)
|
||
}
|
||
if form.FileSize > 0 {
|
||
metaMap["file_size"] = form.FileSize
|
||
}
|
||
if form.SHA256 != "" {
|
||
metaMap["sha256"] = strings.ToLower(strings.TrimSpace(form.SHA256))
|
||
}
|
||
metaBytes, _ := json.Marshal(metaMap)
|
||
if len(metaBytes) == 0 {
|
||
metaBytes = []byte("{}")
|
||
}
|
||
|
||
m := &models.MediaAsset{
|
||
TenantID: tenantID,
|
||
UserID: operatorUserID,
|
||
Type: typ,
|
||
Status: consts.MediaAssetStatusUploaded,
|
||
Provider: "stub",
|
||
Bucket: "",
|
||
ObjectKey: objectKey,
|
||
Meta: types.JSON(metaBytes),
|
||
CreatedAt: now,
|
||
UpdatedAt: now,
|
||
}
|
||
if err := m.Create(ctx); err != nil {
|
||
return nil, pkgerrors.Wrap(err, "create media asset failed")
|
||
}
|
||
|
||
logrus.WithFields(logrus.Fields{
|
||
"tenant_id": tenantID,
|
||
"user_id": operatorUserID,
|
||
"asset_id": m.ID,
|
||
"type": typ,
|
||
"object_key": objectKey,
|
||
}).Info("services.media_asset.admin.upload_init")
|
||
|
||
// 约定:upload_url 先返回空或内部占位;后续接入真实存储签名后再补齐。
|
||
return &tenant_dto.AdminMediaAssetUploadInitResponse{
|
||
AssetID: m.ID,
|
||
Provider: m.Provider,
|
||
Bucket: m.Bucket,
|
||
ObjectKey: m.ObjectKey,
|
||
UploadURL: "",
|
||
Headers: map[string]string{},
|
||
FormFields: map[string]string{},
|
||
ExpiresAt: nil,
|
||
}, nil
|
||
}
|
||
|
||
// AdminUploadComplete marks the asset upload as completed and transitions status uploaded -> processing.
|
||
// 幂等语义:
|
||
// - 若当前已是 processing/ready/failed,则直接返回当前资源,不重复触发处理。
|
||
// - 仅允许 uploaded 状态进入 processing;其他状态返回状态冲突/前置条件失败。
|
||
func (s *mediaAsset) AdminUploadComplete(
|
||
ctx context.Context,
|
||
tenantID, operatorUserID, assetID int64,
|
||
form *tenant_dto.AdminMediaAssetUploadCompleteForm,
|
||
now time.Time,
|
||
) (*models.MediaAsset, error) {
|
||
if tenantID <= 0 || operatorUserID <= 0 || assetID <= 0 {
|
||
return nil, errorx.ErrInvalidParameter.WithMsg("tenant_id/operator_user_id/asset_id must be > 0")
|
||
}
|
||
if now.IsZero() {
|
||
now = time.Now()
|
||
}
|
||
|
||
logrus.WithFields(logrus.Fields{
|
||
"tenant_id": tenantID,
|
||
"user_id": operatorUserID,
|
||
"asset_id": assetID,
|
||
}).Info("services.media_asset.admin.upload_complete")
|
||
|
||
var out models.MediaAsset
|
||
|
||
err := _db.WithContext(ctx).Transaction(func(tx *gorm.DB) error {
|
||
var m models.MediaAsset
|
||
if err := tx.
|
||
Clauses(clause.Locking{Strength: "UPDATE"}).
|
||
Where("tenant_id = ? AND id = ?", tenantID, assetID).
|
||
First(&m).Error; err != nil {
|
||
if errors.Is(err, gorm.ErrRecordNotFound) {
|
||
return errorx.ErrRecordNotFound.WithMsg("media asset not found")
|
||
}
|
||
return err
|
||
}
|
||
|
||
// 软删除资源不允许进入处理流程。
|
||
if m.DeletedAt.Valid {
|
||
return errorx.ErrPreconditionFailed.WithMsg("media asset deleted")
|
||
}
|
||
|
||
// 幂等:重复 upload_complete 时返回现态。
|
||
switch m.Status {
|
||
case consts.MediaAssetStatusProcessing, consts.MediaAssetStatusReady, consts.MediaAssetStatusFailed:
|
||
out = m
|
||
return nil
|
||
case consts.MediaAssetStatusUploaded:
|
||
// allowed
|
||
default:
|
||
return errorx.ErrStatusConflict.WithMsg("invalid media asset status")
|
||
}
|
||
|
||
// 合并 meta(尽量不覆盖已有字段)。
|
||
meta := map[string]any{}
|
||
if len(m.Meta) > 0 {
|
||
_ = json.Unmarshal(m.Meta, &meta)
|
||
}
|
||
meta["upload_complete_at"] = now.UTC().Format(time.RFC3339Nano)
|
||
if form != nil {
|
||
if strings.TrimSpace(form.ETag) != "" {
|
||
meta["etag"] = strings.TrimSpace(form.ETag)
|
||
}
|
||
if strings.TrimSpace(form.ContentType) != "" {
|
||
meta["content_type"] = strings.TrimSpace(form.ContentType)
|
||
}
|
||
if form.FileSize > 0 {
|
||
meta["file_size"] = form.FileSize
|
||
}
|
||
if strings.TrimSpace(form.SHA256) != "" {
|
||
meta["sha256"] = strings.ToLower(strings.TrimSpace(form.SHA256))
|
||
}
|
||
}
|
||
metaBytes, _ := json.Marshal(meta)
|
||
if len(metaBytes) == 0 {
|
||
metaBytes = []byte("{}")
|
||
}
|
||
|
||
// 状态迁移:uploaded -> processing
|
||
if !mediaAssetTransitionAllowed(m.Status, consts.MediaAssetStatusProcessing) {
|
||
return errorx.ErrStatusConflict.WithMsg("invalid media asset status transition")
|
||
}
|
||
if err := tx.Model(&models.MediaAsset{}).
|
||
Where("id = ?", m.ID).
|
||
Updates(map[string]any{
|
||
"status": consts.MediaAssetStatusProcessing,
|
||
"meta": types.JSON(metaBytes),
|
||
"updated_at": now,
|
||
}).Error; err != nil {
|
||
return err
|
||
}
|
||
|
||
m.Status = consts.MediaAssetStatusProcessing
|
||
m.Meta = types.JSON(metaBytes)
|
||
m.UpdatedAt = now
|
||
out = m
|
||
|
||
// 触发异步处理(当前为 stub):后续接入队列/任务系统时在此处落任务并保持幂等。
|
||
logrus.WithFields(logrus.Fields{
|
||
"tenant_id": tenantID,
|
||
"user_id": operatorUserID,
|
||
"asset_id": assetID,
|
||
"status": m.Status,
|
||
}).Info("services.media_asset.process.triggered")
|
||
|
||
return nil
|
||
})
|
||
if err != nil {
|
||
return nil, err
|
||
}
|
||
return &out, nil
|
||
}
|
||
|
||
// ProcessSuccess marks a processing asset as ready.
|
||
// 用于异步处理链路(worker/job)回写处理结果;当前不暴露 HTTP 接口。
|
||
func (s *mediaAsset) ProcessSuccess(
|
||
ctx context.Context,
|
||
tenantID, assetID int64,
|
||
metaPatch map[string]any,
|
||
now time.Time,
|
||
) (*models.MediaAsset, error) {
|
||
if tenantID <= 0 || assetID <= 0 {
|
||
return nil, errorx.ErrInvalidParameter.WithMsg("tenant_id/asset_id must be > 0")
|
||
}
|
||
if now.IsZero() {
|
||
now = time.Now()
|
||
}
|
||
|
||
var out models.MediaAsset
|
||
err := _db.WithContext(ctx).Transaction(func(tx *gorm.DB) error {
|
||
var m models.MediaAsset
|
||
if err := tx.
|
||
Clauses(clause.Locking{Strength: "UPDATE"}).
|
||
Where("tenant_id = ? AND id = ?", tenantID, assetID).
|
||
First(&m).Error; err != nil {
|
||
if errors.Is(err, gorm.ErrRecordNotFound) {
|
||
return errorx.ErrRecordNotFound.WithMsg("media asset not found")
|
||
}
|
||
return err
|
||
}
|
||
if m.DeletedAt.Valid || m.Status == consts.MediaAssetStatusDeleted {
|
||
return errorx.ErrPreconditionFailed.WithMsg("media asset deleted")
|
||
}
|
||
if m.Status == consts.MediaAssetStatusReady {
|
||
out = m
|
||
return nil
|
||
}
|
||
if !mediaAssetTransitionAllowed(m.Status, consts.MediaAssetStatusReady) {
|
||
return errorx.ErrStatusConflict.WithMsg("invalid media asset status transition")
|
||
}
|
||
|
||
meta := map[string]any{}
|
||
if len(m.Meta) > 0 {
|
||
_ = json.Unmarshal(m.Meta, &meta)
|
||
}
|
||
for k, v := range metaPatch {
|
||
if strings.TrimSpace(k) == "" {
|
||
continue
|
||
}
|
||
meta[k] = v
|
||
}
|
||
meta["processed_at"] = now.UTC().Format(time.RFC3339Nano)
|
||
metaBytes, _ := json.Marshal(meta)
|
||
if len(metaBytes) == 0 {
|
||
metaBytes = []byte("{}")
|
||
}
|
||
|
||
if err := tx.Model(&models.MediaAsset{}).
|
||
Where("id = ?", m.ID).
|
||
Updates(map[string]any{
|
||
"status": consts.MediaAssetStatusReady,
|
||
"meta": types.JSON(metaBytes),
|
||
"updated_at": now,
|
||
}).Error; err != nil {
|
||
return err
|
||
}
|
||
m.Status = consts.MediaAssetStatusReady
|
||
m.Meta = types.JSON(metaBytes)
|
||
m.UpdatedAt = now
|
||
out = m
|
||
return nil
|
||
})
|
||
if err != nil {
|
||
return nil, err
|
||
}
|
||
return &out, nil
|
||
}
|
||
|
||
// ProcessFailed marks a processing asset as failed.
|
||
// 用于异步处理链路(worker/job)回写处理结果;当前不暴露 HTTP 接口。
|
||
func (s *mediaAsset) ProcessFailed(
|
||
ctx context.Context,
|
||
tenantID, assetID int64,
|
||
reason string,
|
||
now time.Time,
|
||
) (*models.MediaAsset, error) {
|
||
if tenantID <= 0 || assetID <= 0 {
|
||
return nil, errorx.ErrInvalidParameter.WithMsg("tenant_id/asset_id must be > 0")
|
||
}
|
||
if now.IsZero() {
|
||
now = time.Now()
|
||
}
|
||
|
||
var out models.MediaAsset
|
||
err := _db.WithContext(ctx).Transaction(func(tx *gorm.DB) error {
|
||
var m models.MediaAsset
|
||
if err := tx.
|
||
Clauses(clause.Locking{Strength: "UPDATE"}).
|
||
Where("tenant_id = ? AND id = ?", tenantID, assetID).
|
||
First(&m).Error; err != nil {
|
||
if errors.Is(err, gorm.ErrRecordNotFound) {
|
||
return errorx.ErrRecordNotFound.WithMsg("media asset not found")
|
||
}
|
||
return err
|
||
}
|
||
if m.DeletedAt.Valid || m.Status == consts.MediaAssetStatusDeleted {
|
||
return errorx.ErrPreconditionFailed.WithMsg("media asset deleted")
|
||
}
|
||
if m.Status == consts.MediaAssetStatusFailed {
|
||
out = m
|
||
return nil
|
||
}
|
||
if !mediaAssetTransitionAllowed(m.Status, consts.MediaAssetStatusFailed) {
|
||
return errorx.ErrStatusConflict.WithMsg("invalid media asset status transition")
|
||
}
|
||
|
||
meta := map[string]any{}
|
||
if len(m.Meta) > 0 {
|
||
_ = json.Unmarshal(m.Meta, &meta)
|
||
}
|
||
if strings.TrimSpace(reason) != "" {
|
||
meta["failed_reason"] = strings.TrimSpace(reason)
|
||
}
|
||
meta["failed_at"] = now.UTC().Format(time.RFC3339Nano)
|
||
metaBytes, _ := json.Marshal(meta)
|
||
if len(metaBytes) == 0 {
|
||
metaBytes = []byte("{}")
|
||
}
|
||
|
||
if err := tx.Model(&models.MediaAsset{}).
|
||
Where("id = ?", m.ID).
|
||
Updates(map[string]any{
|
||
"status": consts.MediaAssetStatusFailed,
|
||
"meta": types.JSON(metaBytes),
|
||
"updated_at": now,
|
||
}).Error; err != nil {
|
||
return err
|
||
}
|
||
m.Status = consts.MediaAssetStatusFailed
|
||
m.Meta = types.JSON(metaBytes)
|
||
m.UpdatedAt = now
|
||
out = m
|
||
return nil
|
||
})
|
||
if err != nil {
|
||
return nil, err
|
||
}
|
||
return &out, nil
|
||
}
|
||
|
||
// AdminDelete soft-deletes a media asset (ready/failed -> deleted).
|
||
func (s *mediaAsset) AdminDelete(ctx context.Context, tenantID, operatorUserID, assetID int64, now time.Time) (*models.MediaAsset, error) {
|
||
if tenantID <= 0 || operatorUserID <= 0 || assetID <= 0 {
|
||
return nil, errorx.ErrInvalidParameter.WithMsg("tenant_id/operator_user_id/asset_id must be > 0")
|
||
}
|
||
if now.IsZero() {
|
||
now = time.Now()
|
||
}
|
||
|
||
logrus.WithFields(logrus.Fields{
|
||
"tenant_id": tenantID,
|
||
"user_id": operatorUserID,
|
||
"asset_id": assetID,
|
||
}).Info("services.media_asset.admin.delete")
|
||
|
||
var out models.MediaAsset
|
||
err := _db.WithContext(ctx).Transaction(func(tx *gorm.DB) error {
|
||
var m models.MediaAsset
|
||
if err := tx.
|
||
Clauses(clause.Locking{Strength: "UPDATE"}).
|
||
Where("tenant_id = ? AND id = ?", tenantID, assetID).
|
||
First(&m).Error; err != nil {
|
||
if errors.Is(err, gorm.ErrRecordNotFound) {
|
||
return errorx.ErrRecordNotFound.WithMsg("media asset not found")
|
||
}
|
||
return err
|
||
}
|
||
|
||
// 幂等:已删除直接返回。
|
||
if m.DeletedAt.Valid || m.Status == consts.MediaAssetStatusDeleted {
|
||
out = m
|
||
return nil
|
||
}
|
||
|
||
if !mediaAssetTransitionAllowed(m.Status, consts.MediaAssetStatusDeleted) {
|
||
return errorx.ErrStatusConflict.WithMsg("invalid media asset status transition")
|
||
}
|
||
|
||
if err := tx.Model(&models.MediaAsset{}).
|
||
Where("id = ?", m.ID).
|
||
Updates(map[string]any{
|
||
"status": consts.MediaAssetStatusDeleted,
|
||
"updated_at": now,
|
||
}).Error; err != nil {
|
||
return err
|
||
}
|
||
|
||
if err := tx.Delete(&m).Error; err != nil {
|
||
return err
|
||
}
|
||
|
||
m.Status = consts.MediaAssetStatusDeleted
|
||
m.UpdatedAt = now
|
||
out = m
|
||
return nil
|
||
})
|
||
if err != nil {
|
||
return nil, err
|
||
}
|
||
return &out, nil
|
||
}
|
||
|
||
// AdminPage 分页查询租户内媒体资源(租户管理)。
|
||
func (s *mediaAsset) AdminPage(ctx context.Context, tenantID int64, filter *tenant_dto.AdminMediaAssetListFilter) (*requests.Pager, error) {
|
||
if tenantID <= 0 {
|
||
return nil, errorx.ErrInvalidParameter.WithMsg("tenant_id must be > 0")
|
||
}
|
||
if filter == nil {
|
||
filter = &tenant_dto.AdminMediaAssetListFilter{}
|
||
}
|
||
|
||
logrus.WithFields(logrus.Fields{
|
||
"tenant_id": tenantID,
|
||
"type": lo.FromPtr(filter.Type),
|
||
"status": lo.FromPtr(filter.Status),
|
||
"created_at_from": filter.CreatedAtFrom,
|
||
"created_at_to": filter.CreatedAtTo,
|
||
"sort_asc_fields": filter.AscFields(),
|
||
"sort_desc_fields": filter.DescFields(),
|
||
}).Info("services.media_asset.admin.page")
|
||
|
||
filter.Pagination.Format()
|
||
|
||
tbl, query := models.MediaAssetQuery.QueryContext(ctx)
|
||
|
||
conds := []gen.Condition{
|
||
tbl.TenantID.Eq(tenantID),
|
||
tbl.DeletedAt.IsNull(),
|
||
}
|
||
if filter.Type != nil {
|
||
conds = append(conds, tbl.Type.Eq(*filter.Type))
|
||
}
|
||
if filter.Status != nil {
|
||
conds = append(conds, tbl.Status.Eq(*filter.Status))
|
||
}
|
||
if filter.CreatedAtFrom != nil {
|
||
conds = append(conds, tbl.CreatedAt.Gte(*filter.CreatedAtFrom))
|
||
}
|
||
if filter.CreatedAtTo != nil {
|
||
conds = append(conds, tbl.CreatedAt.Lte(*filter.CreatedAtTo))
|
||
}
|
||
|
||
// 排序白名单:避免把任意字符串拼进 SQL 导致注入或慢查询。
|
||
orderBys := make([]field.Expr, 0, 4)
|
||
allowedAsc := map[string]field.Expr{
|
||
"id": tbl.ID.Asc(),
|
||
"created_at": tbl.CreatedAt.Asc(),
|
||
"updated_at": tbl.UpdatedAt.Asc(),
|
||
}
|
||
allowedDesc := map[string]field.Expr{
|
||
"id": tbl.ID.Desc(),
|
||
"created_at": tbl.CreatedAt.Desc(),
|
||
"updated_at": tbl.UpdatedAt.Desc(),
|
||
}
|
||
for _, f := range filter.AscFields() {
|
||
f = strings.TrimSpace(f)
|
||
if f == "" {
|
||
continue
|
||
}
|
||
if ob, ok := allowedAsc[f]; ok {
|
||
orderBys = append(orderBys, ob)
|
||
}
|
||
}
|
||
for _, f := range filter.DescFields() {
|
||
f = strings.TrimSpace(f)
|
||
if f == "" {
|
||
continue
|
||
}
|
||
if ob, ok := allowedDesc[f]; ok {
|
||
orderBys = append(orderBys, ob)
|
||
}
|
||
}
|
||
if len(orderBys) == 0 {
|
||
orderBys = append(orderBys, tbl.ID.Desc())
|
||
} else {
|
||
orderBys = append(orderBys, tbl.ID.Desc())
|
||
}
|
||
|
||
items, total, err := query.Where(conds...).Order(orderBys...).FindByPage(int(filter.Offset()), int(filter.Limit))
|
||
if err != nil {
|
||
return nil, err
|
||
}
|
||
|
||
return &requests.Pager{
|
||
Pagination: filter.Pagination,
|
||
Total: total,
|
||
Items: items,
|
||
}, nil
|
||
}
|
||
|
||
// AdminDetail 查询租户内媒体资源详情(租户管理)。
|
||
func (s *mediaAsset) AdminDetail(ctx context.Context, tenantID, assetID int64) (*models.MediaAsset, error) {
|
||
if tenantID <= 0 || assetID <= 0 {
|
||
return nil, errorx.ErrInvalidParameter.WithMsg("tenant_id/asset_id must be > 0")
|
||
}
|
||
|
||
logrus.WithFields(logrus.Fields{
|
||
"tenant_id": tenantID,
|
||
"asset_id": assetID,
|
||
}).Info("services.media_asset.admin.detail")
|
||
|
||
tbl, query := models.MediaAssetQuery.QueryContext(ctx)
|
||
m, err := query.Where(
|
||
tbl.TenantID.Eq(tenantID),
|
||
tbl.ID.Eq(assetID),
|
||
tbl.DeletedAt.IsNull(),
|
||
).First()
|
||
if err != nil {
|
||
if errors.Is(err, gorm.ErrRecordNotFound) {
|
||
return nil, errorx.ErrRecordNotFound.WithMsg("media asset not found")
|
||
}
|
||
return nil, err
|
||
}
|
||
return m, nil
|
||
}
|