package services import ( "context" "crypto/rand" "encoding/base32" "encoding/json" "errors" "strconv" "strings" "time" "quyun/v2/app/errorx" tenant_dto "quyun/v2/app/http/tenant/dto" "quyun/v2/app/requests" "quyun/v2/database/models" "quyun/v2/pkg/consts" pkgerrors "github.com/pkg/errors" "github.com/samber/lo" "github.com/sirupsen/logrus" "go.ipao.vip/gen" "go.ipao.vip/gen/field" "go.ipao.vip/gen/types" "gorm.io/gorm" "gorm.io/gorm/clause" ) // mediaAsset 提供媒体资源上传初始化等能力(上传/处理链路会在后续里程碑补齐)。 // // @provider type mediaAsset struct{} func newObjectKey(tenantID, userID int64, assetType consts.MediaAssetType, now time.Time) (string, error) { // object_key 作为存储定位的关键字段:必须由服务端生成,避免客户端路径注入与越权覆盖。 buf := make([]byte, 16) // 128-bit if _, err := rand.Read(buf); err != nil { return "", err } token := strings.ToLower(base32.StdEncoding.WithPadding(base32.NoPadding).EncodeToString(buf)) date := now.UTC().Format("20060102") return "tenants/" + strconv.FormatInt(tenantID, 10) + "/users/" + strconv.FormatInt(userID, 10) + "/" + string(assetType) + "/" + date + "/" + token, nil } // AdminUploadInit creates a MediaAsset record and returns upload parameters. // 当前版本为“stub 上传初始化”:只负责生成 asset 与 object_key,不对接外部存储签名。 func (s *mediaAsset) AdminUploadInit(ctx context.Context, tenantID, operatorUserID int64, form *tenant_dto.AdminMediaAssetUploadInitForm, now time.Time) (*tenant_dto.AdminMediaAssetUploadInitResponse, error) { if tenantID <= 0 || operatorUserID <= 0 { return nil, errorx.ErrInvalidParameter.WithMsg("tenant_id/operator_user_id must be > 0") } if form == nil { return nil, errorx.ErrInvalidParameter.WithMsg("form is nil") } if now.IsZero() { now = time.Now() } typ := consts.MediaAssetType(strings.TrimSpace(form.Type)) if typ == "" || !typ.IsValid() { return nil, errorx.ErrInvalidParameter.WithMsg("invalid type") } objectKey, err := newObjectKey(tenantID, operatorUserID, typ, now) if err != nil { return nil, pkgerrors.Wrap(err, "generate object_key failed") } metaMap := map[string]any{} if form.ContentType != "" { metaMap["content_type"] = strings.TrimSpace(form.ContentType) } if form.FileSize > 0 { metaMap["file_size"] = form.FileSize } if form.SHA256 != "" { metaMap["sha256"] = strings.ToLower(strings.TrimSpace(form.SHA256)) } metaBytes, _ := json.Marshal(metaMap) if len(metaBytes) == 0 { metaBytes = []byte("{}") } m := &models.MediaAsset{ TenantID: tenantID, UserID: operatorUserID, Type: typ, Status: consts.MediaAssetStatusUploaded, Provider: "stub", Bucket: "", ObjectKey: objectKey, Meta: types.JSON(metaBytes), CreatedAt: now, UpdatedAt: now, } if err := m.Create(ctx); err != nil { return nil, pkgerrors.Wrap(err, "create media asset failed") } logrus.WithFields(logrus.Fields{ "tenant_id": tenantID, "user_id": operatorUserID, "asset_id": m.ID, "type": typ, "object_key": objectKey, }).Info("services.media_asset.admin.upload_init") // 约定:upload_url 先返回空或内部占位;后续接入真实存储签名后再补齐。 return &tenant_dto.AdminMediaAssetUploadInitResponse{ AssetID: m.ID, Provider: m.Provider, Bucket: m.Bucket, ObjectKey: m.ObjectKey, UploadURL: "", Headers: map[string]string{}, FormFields: map[string]string{}, ExpiresAt: nil, }, nil } // AdminUploadComplete marks the asset upload as completed and transitions status uploaded -> processing. // 幂等语义: // - 若当前已是 processing/ready/failed,则直接返回当前资源,不重复触发处理。 // - 仅允许 uploaded 状态进入 processing;其他状态返回状态冲突/前置条件失败。 func (s *mediaAsset) AdminUploadComplete( ctx context.Context, tenantID, operatorUserID, assetID int64, form *tenant_dto.AdminMediaAssetUploadCompleteForm, now time.Time, ) (*models.MediaAsset, error) { if tenantID <= 0 || operatorUserID <= 0 || assetID <= 0 { return nil, errorx.ErrInvalidParameter.WithMsg("tenant_id/operator_user_id/asset_id must be > 0") } if now.IsZero() { now = time.Now() } logrus.WithFields(logrus.Fields{ "tenant_id": tenantID, "user_id": operatorUserID, "asset_id": assetID, }).Info("services.media_asset.admin.upload_complete") var out models.MediaAsset err := _db.WithContext(ctx).Transaction(func(tx *gorm.DB) error { var m models.MediaAsset if err := tx. Clauses(clause.Locking{Strength: "UPDATE"}). Where("tenant_id = ? AND id = ?", tenantID, assetID). First(&m).Error; err != nil { if errors.Is(err, gorm.ErrRecordNotFound) { return errorx.ErrRecordNotFound.WithMsg("media asset not found") } return err } // 软删除资源不允许进入处理流程。 if m.DeletedAt.Valid { return errorx.ErrPreconditionFailed.WithMsg("media asset deleted") } // 幂等:重复 upload_complete 时返回现态。 switch m.Status { case consts.MediaAssetStatusProcessing, consts.MediaAssetStatusReady, consts.MediaAssetStatusFailed: out = m return nil case consts.MediaAssetStatusUploaded: // allowed default: return errorx.ErrStatusConflict.WithMsg("invalid media asset status") } // 合并 meta(尽量不覆盖已有字段)。 meta := map[string]any{} if len(m.Meta) > 0 { _ = json.Unmarshal(m.Meta, &meta) } meta["upload_complete_at"] = now.UTC().Format(time.RFC3339Nano) if form != nil { if strings.TrimSpace(form.ETag) != "" { meta["etag"] = strings.TrimSpace(form.ETag) } if strings.TrimSpace(form.ContentType) != "" { meta["content_type"] = strings.TrimSpace(form.ContentType) } if form.FileSize > 0 { meta["file_size"] = form.FileSize } if strings.TrimSpace(form.SHA256) != "" { meta["sha256"] = strings.ToLower(strings.TrimSpace(form.SHA256)) } } metaBytes, _ := json.Marshal(meta) if len(metaBytes) == 0 { metaBytes = []byte("{}") } // 状态迁移:uploaded -> processing if err := tx.Model(&models.MediaAsset{}). Where("id = ?", m.ID). Updates(map[string]any{ "status": consts.MediaAssetStatusProcessing, "meta": types.JSON(metaBytes), "updated_at": now, }).Error; err != nil { return err } m.Status = consts.MediaAssetStatusProcessing m.Meta = types.JSON(metaBytes) m.UpdatedAt = now out = m // 触发异步处理(当前为 stub):后续接入队列/任务系统时在此处落任务并保持幂等。 logrus.WithFields(logrus.Fields{ "tenant_id": tenantID, "user_id": operatorUserID, "asset_id": assetID, "status": m.Status, }).Info("services.media_asset.process.triggered") return nil }) if err != nil { return nil, err } return &out, nil } // AdminPage 分页查询租户内媒体资源(租户管理)。 func (s *mediaAsset) AdminPage(ctx context.Context, tenantID int64, filter *tenant_dto.AdminMediaAssetListFilter) (*requests.Pager, error) { if tenantID <= 0 { return nil, errorx.ErrInvalidParameter.WithMsg("tenant_id must be > 0") } if filter == nil { filter = &tenant_dto.AdminMediaAssetListFilter{} } logrus.WithFields(logrus.Fields{ "tenant_id": tenantID, "type": lo.FromPtr(filter.Type), "status": lo.FromPtr(filter.Status), "created_at_from": filter.CreatedAtFrom, "created_at_to": filter.CreatedAtTo, "sort_asc_fields": filter.AscFields(), "sort_desc_fields": filter.DescFields(), }).Info("services.media_asset.admin.page") filter.Pagination.Format() tbl, query := models.MediaAssetQuery.QueryContext(ctx) conds := []gen.Condition{ tbl.TenantID.Eq(tenantID), tbl.DeletedAt.IsNull(), } if filter.Type != nil { conds = append(conds, tbl.Type.Eq(*filter.Type)) } if filter.Status != nil { conds = append(conds, tbl.Status.Eq(*filter.Status)) } if filter.CreatedAtFrom != nil { conds = append(conds, tbl.CreatedAt.Gte(*filter.CreatedAtFrom)) } if filter.CreatedAtTo != nil { conds = append(conds, tbl.CreatedAt.Lte(*filter.CreatedAtTo)) } // 排序白名单:避免把任意字符串拼进 SQL 导致注入或慢查询。 orderBys := make([]field.Expr, 0, 4) allowedAsc := map[string]field.Expr{ "id": tbl.ID.Asc(), "created_at": tbl.CreatedAt.Asc(), "updated_at": tbl.UpdatedAt.Asc(), } allowedDesc := map[string]field.Expr{ "id": tbl.ID.Desc(), "created_at": tbl.CreatedAt.Desc(), "updated_at": tbl.UpdatedAt.Desc(), } for _, f := range filter.AscFields() { f = strings.TrimSpace(f) if f == "" { continue } if ob, ok := allowedAsc[f]; ok { orderBys = append(orderBys, ob) } } for _, f := range filter.DescFields() { f = strings.TrimSpace(f) if f == "" { continue } if ob, ok := allowedDesc[f]; ok { orderBys = append(orderBys, ob) } } if len(orderBys) == 0 { orderBys = append(orderBys, tbl.ID.Desc()) } else { orderBys = append(orderBys, tbl.ID.Desc()) } items, total, err := query.Where(conds...).Order(orderBys...).FindByPage(int(filter.Offset()), int(filter.Limit)) if err != nil { return nil, err } return &requests.Pager{ Pagination: filter.Pagination, Total: total, Items: items, }, nil } // AdminDetail 查询租户内媒体资源详情(租户管理)。 func (s *mediaAsset) AdminDetail(ctx context.Context, tenantID, assetID int64) (*models.MediaAsset, error) { if tenantID <= 0 || assetID <= 0 { return nil, errorx.ErrInvalidParameter.WithMsg("tenant_id/asset_id must be > 0") } logrus.WithFields(logrus.Fields{ "tenant_id": tenantID, "asset_id": assetID, }).Info("services.media_asset.admin.detail") tbl, query := models.MediaAssetQuery.QueryContext(ctx) m, err := query.Where( tbl.TenantID.Eq(tenantID), tbl.ID.Eq(assetID), tbl.DeletedAt.IsNull(), ).First() if err != nil { if errors.Is(err, gorm.ErrRecordNotFound) { return nil, errorx.ErrRecordNotFound.WithMsg("media asset not found") } return nil, err } return m, nil }