Files
quyun-v2/backend/app/services/media_asset.go

637 lines
18 KiB
Go
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
package services
import (
"context"
"crypto/rand"
"encoding/base32"
"encoding/json"
"errors"
"strconv"
"strings"
"time"
"quyun/v2/app/errorx"
tenant_dto "quyun/v2/app/http/tenant/dto"
"quyun/v2/app/requests"
"quyun/v2/database/models"
"quyun/v2/pkg/consts"
pkgerrors "github.com/pkg/errors"
"github.com/samber/lo"
"github.com/sirupsen/logrus"
"go.ipao.vip/gen"
"go.ipao.vip/gen/field"
"go.ipao.vip/gen/types"
"gorm.io/gorm"
"gorm.io/gorm/clause"
)
// mediaAsset 提供媒体资源上传初始化等能力(上传/处理链路会在后续里程碑补齐)。
//
// @provider
type mediaAsset struct{}
const (
mediaAssetVariantMain = "main"
mediaAssetVariantPreview = "preview"
)
func mediaAssetTransitionAllowed(from, to consts.MediaAssetStatus) bool {
switch from {
case consts.MediaAssetStatusUploaded:
return to == consts.MediaAssetStatusProcessing
case consts.MediaAssetStatusProcessing:
return to == consts.MediaAssetStatusReady || to == consts.MediaAssetStatusFailed
case consts.MediaAssetStatusReady, consts.MediaAssetStatusFailed:
return to == consts.MediaAssetStatusDeleted
default:
return false
}
}
func newObjectKey(tenantID, userID int64, assetType consts.MediaAssetType, now time.Time) (string, error) {
// object_key 作为存储定位的关键字段:必须由服务端生成,避免客户端路径注入与越权覆盖。
buf := make([]byte, 16) // 128-bit
if _, err := rand.Read(buf); err != nil {
return "", err
}
token := strings.ToLower(base32.StdEncoding.WithPadding(base32.NoPadding).EncodeToString(buf))
date := now.UTC().Format("20060102")
return "tenants/" + strconv.FormatInt(tenantID, 10) +
"/users/" + strconv.FormatInt(userID, 10) +
"/" + string(assetType) +
"/" + date +
"/" + token, nil
}
// AdminUploadInit creates a MediaAsset record and returns upload parameters.
// 当前版本为“stub 上传初始化”:只负责生成 asset 与 object_key不对接外部存储签名。
func (s *mediaAsset) AdminUploadInit(ctx context.Context, tenantID, operatorUserID int64, form *tenant_dto.AdminMediaAssetUploadInitForm, now time.Time) (*tenant_dto.AdminMediaAssetUploadInitResponse, error) {
if tenantID <= 0 || operatorUserID <= 0 {
return nil, errorx.ErrInvalidParameter.WithMsg("tenant_id/operator_user_id must be > 0")
}
if form == nil {
return nil, errorx.ErrInvalidParameter.WithMsg("form is nil")
}
if now.IsZero() {
now = time.Now()
}
typ := consts.MediaAssetType(strings.TrimSpace(form.Type))
if typ == "" || !typ.IsValid() {
return nil, errorx.ErrInvalidParameter.WithMsg("invalid type")
}
variant := strings.TrimSpace(strings.ToLower(form.Variant))
if variant == "" {
variant = mediaAssetVariantMain
}
if variant != mediaAssetVariantMain && variant != mediaAssetVariantPreview {
return nil, errorx.ErrInvalidParameter.WithMsg("invalid variant")
}
var sourceAssetID int64
if form.SourceAssetID != nil {
sourceAssetID = *form.SourceAssetID
}
if variant == mediaAssetVariantMain {
if sourceAssetID != 0 {
return nil, errorx.ErrInvalidParameter.WithMsg("source_asset_id is only allowed for preview variant")
}
} else {
// preview variant: requires a source main asset for traceability.
if sourceAssetID <= 0 {
return nil, errorx.ErrInvalidParameter.WithMsg("source_asset_id is required for preview variant")
}
// 校验来源资源存在、同租户、未删除、且为 main 产物。
var srcRow struct {
Variant string `gorm:"column:variant"`
}
if err := _db.WithContext(ctx).
Table(models.TableNameMediaAsset).
Select("variant").
Where("tenant_id = ? AND id = ? AND deleted_at IS NULL", tenantID, sourceAssetID).
Take(&srcRow).Error; err != nil {
if errors.Is(err, gorm.ErrRecordNotFound) {
return nil, errorx.ErrRecordNotFound.WithMsg("source media asset not found")
}
return nil, err
}
srcVariant := srcRow.Variant
if srcVariant == "" {
srcVariant = mediaAssetVariantMain
}
if srcVariant != mediaAssetVariantMain {
return nil, errorx.ErrPreconditionFailed.WithMsg("source asset must be main variant")
}
}
objectKey, err := newObjectKey(tenantID, operatorUserID, typ, now)
if err != nil {
return nil, pkgerrors.Wrap(err, "generate object_key failed")
}
metaMap := map[string]any{}
if form.ContentType != "" {
metaMap["content_type"] = strings.TrimSpace(form.ContentType)
}
if form.FileSize > 0 {
metaMap["file_size"] = form.FileSize
}
if form.SHA256 != "" {
metaMap["sha256"] = strings.ToLower(strings.TrimSpace(form.SHA256))
}
metaBytes, _ := json.Marshal(metaMap)
if len(metaBytes) == 0 {
metaBytes = []byte("{}")
}
m := &models.MediaAsset{
TenantID: tenantID,
UserID: operatorUserID,
Type: typ,
Status: consts.MediaAssetStatusUploaded,
Provider: "stub",
Bucket: "",
ObjectKey: objectKey,
Meta: types.JSON(metaBytes),
CreatedAt: now,
UpdatedAt: now,
}
if err := m.Create(ctx); err != nil {
return nil, pkgerrors.Wrap(err, "create media asset failed")
}
// variant/source_asset_id 目前为 DB 新增字段;由于 models 为 gen 产物,这里用 SQL 更新列值。
updates := map[string]any{
"variant": variant,
}
if sourceAssetID > 0 {
updates["source_asset_id"] = sourceAssetID
}
if err := _db.WithContext(ctx).
Model(&models.MediaAsset{}).
Where("id = ? AND tenant_id = ?", m.ID, tenantID).
Updates(updates).Error; err != nil {
return nil, pkgerrors.Wrap(err, "update media asset variant/source_asset_id failed")
}
logrus.WithFields(logrus.Fields{
"tenant_id": tenantID,
"user_id": operatorUserID,
"asset_id": m.ID,
"type": typ,
"variant": variant,
"source_id": sourceAssetID,
"object_key": objectKey,
}).Info("services.media_asset.admin.upload_init")
// 约定upload_url 先返回空或内部占位;后续接入真实存储签名后再补齐。
return &tenant_dto.AdminMediaAssetUploadInitResponse{
AssetID: m.ID,
Provider: m.Provider,
Bucket: m.Bucket,
ObjectKey: m.ObjectKey,
UploadURL: "",
Headers: map[string]string{},
FormFields: map[string]string{},
ExpiresAt: nil,
}, nil
}
// AdminUploadComplete marks the asset upload as completed and transitions status uploaded -> processing.
// 幂等语义:
// - 若当前已是 processing/ready/failed则直接返回当前资源不重复触发处理。
// - 仅允许 uploaded 状态进入 processing其他状态返回状态冲突/前置条件失败。
func (s *mediaAsset) AdminUploadComplete(
ctx context.Context,
tenantID, operatorUserID, assetID int64,
form *tenant_dto.AdminMediaAssetUploadCompleteForm,
now time.Time,
) (*models.MediaAsset, error) {
if tenantID <= 0 || operatorUserID <= 0 || assetID <= 0 {
return nil, errorx.ErrInvalidParameter.WithMsg("tenant_id/operator_user_id/asset_id must be > 0")
}
if now.IsZero() {
now = time.Now()
}
logrus.WithFields(logrus.Fields{
"tenant_id": tenantID,
"user_id": operatorUserID,
"asset_id": assetID,
}).Info("services.media_asset.admin.upload_complete")
var out models.MediaAsset
err := _db.WithContext(ctx).Transaction(func(tx *gorm.DB) error {
var m models.MediaAsset
if err := tx.
Clauses(clause.Locking{Strength: "UPDATE"}).
Where("tenant_id = ? AND id = ?", tenantID, assetID).
First(&m).Error; err != nil {
if errors.Is(err, gorm.ErrRecordNotFound) {
return errorx.ErrRecordNotFound.WithMsg("media asset not found")
}
return err
}
// 软删除资源不允许进入处理流程。
if m.DeletedAt.Valid {
return errorx.ErrPreconditionFailed.WithMsg("media asset deleted")
}
// 幂等:重复 upload_complete 时返回现态。
switch m.Status {
case consts.MediaAssetStatusProcessing, consts.MediaAssetStatusReady, consts.MediaAssetStatusFailed:
out = m
return nil
case consts.MediaAssetStatusUploaded:
// allowed
default:
return errorx.ErrStatusConflict.WithMsg("invalid media asset status")
}
// 合并 meta尽量不覆盖已有字段
meta := map[string]any{}
if len(m.Meta) > 0 {
_ = json.Unmarshal(m.Meta, &meta)
}
meta["upload_complete_at"] = now.UTC().Format(time.RFC3339Nano)
if form != nil {
if strings.TrimSpace(form.ETag) != "" {
meta["etag"] = strings.TrimSpace(form.ETag)
}
if strings.TrimSpace(form.ContentType) != "" {
meta["content_type"] = strings.TrimSpace(form.ContentType)
}
if form.FileSize > 0 {
meta["file_size"] = form.FileSize
}
if strings.TrimSpace(form.SHA256) != "" {
meta["sha256"] = strings.ToLower(strings.TrimSpace(form.SHA256))
}
}
metaBytes, _ := json.Marshal(meta)
if len(metaBytes) == 0 {
metaBytes = []byte("{}")
}
// 状态迁移uploaded -> processing
if !mediaAssetTransitionAllowed(m.Status, consts.MediaAssetStatusProcessing) {
return errorx.ErrStatusConflict.WithMsg("invalid media asset status transition")
}
if err := tx.Model(&models.MediaAsset{}).
Where("id = ?", m.ID).
Updates(map[string]any{
"status": consts.MediaAssetStatusProcessing,
"meta": types.JSON(metaBytes),
"updated_at": now,
}).Error; err != nil {
return err
}
m.Status = consts.MediaAssetStatusProcessing
m.Meta = types.JSON(metaBytes)
m.UpdatedAt = now
out = m
// 触发异步处理(当前为 stub后续接入队列/任务系统时在此处落任务并保持幂等。
logrus.WithFields(logrus.Fields{
"tenant_id": tenantID,
"user_id": operatorUserID,
"asset_id": assetID,
"status": m.Status,
}).Info("services.media_asset.process.triggered")
return nil
})
if err != nil {
return nil, err
}
return &out, nil
}
// ProcessSuccess marks a processing asset as ready.
// 用于异步处理链路worker/job回写处理结果当前不暴露 HTTP 接口。
func (s *mediaAsset) ProcessSuccess(
ctx context.Context,
tenantID, assetID int64,
metaPatch map[string]any,
now time.Time,
) (*models.MediaAsset, error) {
if tenantID <= 0 || assetID <= 0 {
return nil, errorx.ErrInvalidParameter.WithMsg("tenant_id/asset_id must be > 0")
}
if now.IsZero() {
now = time.Now()
}
var out models.MediaAsset
err := _db.WithContext(ctx).Transaction(func(tx *gorm.DB) error {
var m models.MediaAsset
if err := tx.
Clauses(clause.Locking{Strength: "UPDATE"}).
Where("tenant_id = ? AND id = ?", tenantID, assetID).
First(&m).Error; err != nil {
if errors.Is(err, gorm.ErrRecordNotFound) {
return errorx.ErrRecordNotFound.WithMsg("media asset not found")
}
return err
}
if m.DeletedAt.Valid || m.Status == consts.MediaAssetStatusDeleted {
return errorx.ErrPreconditionFailed.WithMsg("media asset deleted")
}
if m.Status == consts.MediaAssetStatusReady {
out = m
return nil
}
if !mediaAssetTransitionAllowed(m.Status, consts.MediaAssetStatusReady) {
return errorx.ErrStatusConflict.WithMsg("invalid media asset status transition")
}
meta := map[string]any{}
if len(m.Meta) > 0 {
_ = json.Unmarshal(m.Meta, &meta)
}
for k, v := range metaPatch {
if strings.TrimSpace(k) == "" {
continue
}
meta[k] = v
}
meta["processed_at"] = now.UTC().Format(time.RFC3339Nano)
metaBytes, _ := json.Marshal(meta)
if len(metaBytes) == 0 {
metaBytes = []byte("{}")
}
if err := tx.Model(&models.MediaAsset{}).
Where("id = ?", m.ID).
Updates(map[string]any{
"status": consts.MediaAssetStatusReady,
"meta": types.JSON(metaBytes),
"updated_at": now,
}).Error; err != nil {
return err
}
m.Status = consts.MediaAssetStatusReady
m.Meta = types.JSON(metaBytes)
m.UpdatedAt = now
out = m
return nil
})
if err != nil {
return nil, err
}
return &out, nil
}
// ProcessFailed marks a processing asset as failed.
// 用于异步处理链路worker/job回写处理结果当前不暴露 HTTP 接口。
func (s *mediaAsset) ProcessFailed(
ctx context.Context,
tenantID, assetID int64,
reason string,
now time.Time,
) (*models.MediaAsset, error) {
if tenantID <= 0 || assetID <= 0 {
return nil, errorx.ErrInvalidParameter.WithMsg("tenant_id/asset_id must be > 0")
}
if now.IsZero() {
now = time.Now()
}
var out models.MediaAsset
err := _db.WithContext(ctx).Transaction(func(tx *gorm.DB) error {
var m models.MediaAsset
if err := tx.
Clauses(clause.Locking{Strength: "UPDATE"}).
Where("tenant_id = ? AND id = ?", tenantID, assetID).
First(&m).Error; err != nil {
if errors.Is(err, gorm.ErrRecordNotFound) {
return errorx.ErrRecordNotFound.WithMsg("media asset not found")
}
return err
}
if m.DeletedAt.Valid || m.Status == consts.MediaAssetStatusDeleted {
return errorx.ErrPreconditionFailed.WithMsg("media asset deleted")
}
if m.Status == consts.MediaAssetStatusFailed {
out = m
return nil
}
if !mediaAssetTransitionAllowed(m.Status, consts.MediaAssetStatusFailed) {
return errorx.ErrStatusConflict.WithMsg("invalid media asset status transition")
}
meta := map[string]any{}
if len(m.Meta) > 0 {
_ = json.Unmarshal(m.Meta, &meta)
}
if strings.TrimSpace(reason) != "" {
meta["failed_reason"] = strings.TrimSpace(reason)
}
meta["failed_at"] = now.UTC().Format(time.RFC3339Nano)
metaBytes, _ := json.Marshal(meta)
if len(metaBytes) == 0 {
metaBytes = []byte("{}")
}
if err := tx.Model(&models.MediaAsset{}).
Where("id = ?", m.ID).
Updates(map[string]any{
"status": consts.MediaAssetStatusFailed,
"meta": types.JSON(metaBytes),
"updated_at": now,
}).Error; err != nil {
return err
}
m.Status = consts.MediaAssetStatusFailed
m.Meta = types.JSON(metaBytes)
m.UpdatedAt = now
out = m
return nil
})
if err != nil {
return nil, err
}
return &out, nil
}
// AdminDelete soft-deletes a media asset (ready/failed -> deleted).
func (s *mediaAsset) AdminDelete(ctx context.Context, tenantID, operatorUserID, assetID int64, now time.Time) (*models.MediaAsset, error) {
if tenantID <= 0 || operatorUserID <= 0 || assetID <= 0 {
return nil, errorx.ErrInvalidParameter.WithMsg("tenant_id/operator_user_id/asset_id must be > 0")
}
if now.IsZero() {
now = time.Now()
}
logrus.WithFields(logrus.Fields{
"tenant_id": tenantID,
"user_id": operatorUserID,
"asset_id": assetID,
}).Info("services.media_asset.admin.delete")
var out models.MediaAsset
err := _db.WithContext(ctx).Transaction(func(tx *gorm.DB) error {
var m models.MediaAsset
if err := tx.
Clauses(clause.Locking{Strength: "UPDATE"}).
Where("tenant_id = ? AND id = ?", tenantID, assetID).
First(&m).Error; err != nil {
if errors.Is(err, gorm.ErrRecordNotFound) {
return errorx.ErrRecordNotFound.WithMsg("media asset not found")
}
return err
}
// 幂等:已删除直接返回。
if m.DeletedAt.Valid || m.Status == consts.MediaAssetStatusDeleted {
out = m
return nil
}
if !mediaAssetTransitionAllowed(m.Status, consts.MediaAssetStatusDeleted) {
return errorx.ErrStatusConflict.WithMsg("invalid media asset status transition")
}
if err := tx.Model(&models.MediaAsset{}).
Where("id = ?", m.ID).
Updates(map[string]any{
"status": consts.MediaAssetStatusDeleted,
"updated_at": now,
}).Error; err != nil {
return err
}
if err := tx.Delete(&m).Error; err != nil {
return err
}
m.Status = consts.MediaAssetStatusDeleted
m.UpdatedAt = now
out = m
return nil
})
if err != nil {
return nil, err
}
return &out, nil
}
// AdminPage 分页查询租户内媒体资源(租户管理)。
func (s *mediaAsset) AdminPage(ctx context.Context, tenantID int64, filter *tenant_dto.AdminMediaAssetListFilter) (*requests.Pager, error) {
if tenantID <= 0 {
return nil, errorx.ErrInvalidParameter.WithMsg("tenant_id must be > 0")
}
if filter == nil {
filter = &tenant_dto.AdminMediaAssetListFilter{}
}
logrus.WithFields(logrus.Fields{
"tenant_id": tenantID,
"type": lo.FromPtr(filter.Type),
"status": lo.FromPtr(filter.Status),
"created_at_from": filter.CreatedAtFrom,
"created_at_to": filter.CreatedAtTo,
"sort_asc_fields": filter.AscFields(),
"sort_desc_fields": filter.DescFields(),
}).Info("services.media_asset.admin.page")
filter.Pagination.Format()
tbl, query := models.MediaAssetQuery.QueryContext(ctx)
conds := []gen.Condition{
tbl.TenantID.Eq(tenantID),
tbl.DeletedAt.IsNull(),
}
if filter.Type != nil {
conds = append(conds, tbl.Type.Eq(*filter.Type))
}
if filter.Status != nil {
conds = append(conds, tbl.Status.Eq(*filter.Status))
}
if filter.CreatedAtFrom != nil {
conds = append(conds, tbl.CreatedAt.Gte(*filter.CreatedAtFrom))
}
if filter.CreatedAtTo != nil {
conds = append(conds, tbl.CreatedAt.Lte(*filter.CreatedAtTo))
}
// 排序白名单:避免把任意字符串拼进 SQL 导致注入或慢查询。
orderBys := make([]field.Expr, 0, 4)
allowedAsc := map[string]field.Expr{
"id": tbl.ID.Asc(),
"created_at": tbl.CreatedAt.Asc(),
"updated_at": tbl.UpdatedAt.Asc(),
}
allowedDesc := map[string]field.Expr{
"id": tbl.ID.Desc(),
"created_at": tbl.CreatedAt.Desc(),
"updated_at": tbl.UpdatedAt.Desc(),
}
for _, f := range filter.AscFields() {
f = strings.TrimSpace(f)
if f == "" {
continue
}
if ob, ok := allowedAsc[f]; ok {
orderBys = append(orderBys, ob)
}
}
for _, f := range filter.DescFields() {
f = strings.TrimSpace(f)
if f == "" {
continue
}
if ob, ok := allowedDesc[f]; ok {
orderBys = append(orderBys, ob)
}
}
if len(orderBys) == 0 {
orderBys = append(orderBys, tbl.ID.Desc())
} else {
orderBys = append(orderBys, tbl.ID.Desc())
}
items, total, err := query.Where(conds...).Order(orderBys...).FindByPage(int(filter.Offset()), int(filter.Limit))
if err != nil {
return nil, err
}
return &requests.Pager{
Pagination: filter.Pagination,
Total: total,
Items: items,
}, nil
}
// AdminDetail 查询租户内媒体资源详情(租户管理)。
func (s *mediaAsset) AdminDetail(ctx context.Context, tenantID, assetID int64) (*models.MediaAsset, error) {
if tenantID <= 0 || assetID <= 0 {
return nil, errorx.ErrInvalidParameter.WithMsg("tenant_id/asset_id must be > 0")
}
logrus.WithFields(logrus.Fields{
"tenant_id": tenantID,
"asset_id": assetID,
}).Info("services.media_asset.admin.detail")
tbl, query := models.MediaAssetQuery.QueryContext(ctx)
m, err := query.Where(
tbl.TenantID.Eq(tenantID),
tbl.ID.Eq(assetID),
tbl.DeletedAt.IsNull(),
).First()
if err != nil {
if errors.Is(err, gorm.ErrRecordNotFound) {
return nil, errorx.ErrRecordNotFound.WithMsg("media asset not found")
}
return nil, err
}
return m, nil
}