Files
quyun-v2/backend/app/services/order.go

1763 lines
54 KiB
Go
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
package services
import (
"bytes"
"context"
"encoding/csv"
"encoding/json"
"errors"
"fmt"
"strings"
"time"
"quyun/v2/app/errorx"
superdto "quyun/v2/app/http/super/dto"
"quyun/v2/app/http/tenant/dto"
jobs_args "quyun/v2/app/jobs/args"
"quyun/v2/app/requests"
"quyun/v2/database"
"quyun/v2/database/fields"
"quyun/v2/database/models"
"quyun/v2/pkg/consts"
provider_job "quyun/v2/providers/job"
pkgerrors "github.com/pkg/errors"
"github.com/samber/lo"
"github.com/sirupsen/logrus"
"go.ipao.vip/gen"
"go.ipao.vip/gen/field"
"gorm.io/gorm"
"gorm.io/gorm/clause"
"go.ipao.vip/gen/types"
)
func newOrderSnapshot(kind consts.OrderType, payload any) types.JSONType[fields.OrdersSnapshot] {
b, err := json.Marshal(payload)
if err != nil || len(b) == 0 {
b = []byte("{}")
}
return types.NewJSONType(fields.OrdersSnapshot{
Kind: string(kind),
Data: b,
})
}
// AdminOrderExportCSV 租户管理员导出订单列表CSV 文本)。
func (s *order) AdminOrderExportCSV(
ctx context.Context,
tenantID int64,
filter *dto.AdminOrderListFilter,
) (*dto.AdminOrderExportResponse, error) {
if tenantID <= 0 {
return nil, errorx.ErrInvalidParameter.WithMsg("tenant_id must be > 0")
}
if filter == nil {
filter = &dto.AdminOrderListFilter{}
}
// 导出属于高消耗操作:限制最大行数,避免拖垮数据库。
const maxRows = 5000
logrus.WithFields(logrus.Fields{
"tenant_id": tenantID,
"max_rows": maxRows,
"user_id": lo.FromPtr(filter.UserID),
"username": filter.UsernameTrimmed(),
"content_id": lo.FromPtr(filter.ContentID),
"content_title": filter.ContentTitleTrimmed(),
"type": lo.FromPtr(filter.Type),
"status": lo.FromPtr(filter.Status),
}).Info("services.order.admin.export_csv")
tbl, query := models.OrderQuery.QueryContext(ctx)
conds := []gen.Condition{tbl.TenantID.Eq(tenantID)}
if filter.UserID != nil {
conds = append(conds, tbl.UserID.Eq(*filter.UserID))
}
if filter.Type != nil {
conds = append(conds, tbl.Type.Eq(*filter.Type))
}
if filter.Status != nil {
conds = append(conds, tbl.Status.Eq(*filter.Status))
}
if filter.CreatedAtFrom != nil {
conds = append(conds, tbl.CreatedAt.Gte(*filter.CreatedAtFrom))
}
if filter.CreatedAtTo != nil {
conds = append(conds, tbl.CreatedAt.Lte(*filter.CreatedAtTo))
}
if filter.PaidAtFrom != nil {
conds = append(conds, tbl.PaidAt.Gte(*filter.PaidAtFrom))
}
if filter.PaidAtTo != nil {
conds = append(conds, tbl.PaidAt.Lte(*filter.PaidAtTo))
}
if filter.AmountPaidMin != nil {
conds = append(conds, tbl.AmountPaid.Gte(*filter.AmountPaidMin))
}
if filter.AmountPaidMax != nil {
conds = append(conds, tbl.AmountPaid.Lte(*filter.AmountPaidMax))
}
if username := filter.UsernameTrimmed(); username != "" {
uTbl, _ := models.UserQuery.QueryContext(ctx)
query = query.LeftJoin(uTbl, uTbl.ID.EqCol(tbl.UserID))
conds = append(conds, uTbl.Username.Like(database.WrapLike(username)))
}
needItemJoin := (filter.ContentID != nil && *filter.ContentID > 0) || filter.ContentTitleTrimmed() != ""
if needItemJoin {
oiTbl, _ := models.OrderItemQuery.QueryContext(ctx)
query = query.LeftJoin(oiTbl, oiTbl.OrderID.EqCol(tbl.ID))
if filter.ContentID != nil && *filter.ContentID > 0 {
conds = append(conds, oiTbl.ContentID.Eq(*filter.ContentID))
}
if title := filter.ContentTitleTrimmed(); title != "" {
cTbl, _ := models.ContentQuery.QueryContext(ctx)
query = query.LeftJoin(cTbl, cTbl.ID.EqCol(oiTbl.ContentID))
conds = append(conds, cTbl.Title.Like(database.WrapLike(title)))
}
query = query.Group(tbl.ID)
}
// 排序:复用 AdminOrderPage 的白名单,避免任意字段导致注入/慢查询。
orderBys := make([]field.Expr, 0, 4)
allowedAsc := map[string]field.Expr{
"id": tbl.ID.Asc(),
"created_at": tbl.CreatedAt.Asc(),
"paid_at": tbl.PaidAt.Asc(),
"amount_paid": tbl.AmountPaid.Asc(),
}
allowedDesc := map[string]field.Expr{
"id": tbl.ID.Desc(),
"created_at": tbl.CreatedAt.Desc(),
"paid_at": tbl.PaidAt.Desc(),
"amount_paid": tbl.AmountPaid.Desc(),
}
for _, f := range filter.AscFields() {
f = strings.TrimSpace(f)
if f == "" {
continue
}
if ob, ok := allowedAsc[f]; ok {
orderBys = append(orderBys, ob)
}
}
for _, f := range filter.DescFields() {
f = strings.TrimSpace(f)
if f == "" {
continue
}
if ob, ok := allowedDesc[f]; ok {
orderBys = append(orderBys, ob)
}
}
if len(orderBys) == 0 {
orderBys = append(orderBys, tbl.ID.Desc())
} else {
orderBys = append(orderBys, tbl.ID.Desc())
}
items, err := query.Where(conds...).Order(orderBys...).Limit(maxRows).Find()
if err != nil {
return nil, err
}
buf := &bytes.Buffer{}
w := csv.NewWriter(buf)
_ = w.Write([]string{"id", "tenant_id", "user_id", "type", "status", "amount_paid", "paid_at", "created_at"})
for _, it := range items {
if it == nil {
continue
}
paidAt := ""
if !it.PaidAt.IsZero() {
paidAt = it.PaidAt.UTC().Format(time.RFC3339)
}
_ = w.Write([]string{
fmt.Sprintf("%d", it.ID),
fmt.Sprintf("%d", it.TenantID),
fmt.Sprintf("%d", it.UserID),
string(it.Type),
string(it.Status),
fmt.Sprintf("%d", it.AmountPaid),
paidAt,
it.CreatedAt.UTC().Format(time.RFC3339),
})
}
w.Flush()
if err := w.Error(); err != nil {
return nil, err
}
filename := fmt.Sprintf("tenant_%d_orders_%s.csv", tenantID, time.Now().UTC().Format("20060102_150405"))
return &dto.AdminOrderExportResponse{
Filename: filename,
ContentType: "text/csv",
CSV: buf.String(),
}, nil
}
// SuperOrderPage 平台侧分页查询订单(跨租户)。
func (s *order) SuperOrderPage(ctx context.Context, filter *superdto.OrderPageFilter) (*requests.Pager, error) {
if filter == nil {
filter = &superdto.OrderPageFilter{}
}
filter.Pagination.Format()
tbl, query := models.OrderQuery.QueryContext(ctx)
conds := []gen.Condition{}
if filter.ID != nil && *filter.ID > 0 {
conds = append(conds, tbl.ID.Eq(*filter.ID))
}
if filter.TenantID != nil && *filter.TenantID > 0 {
conds = append(conds, tbl.TenantID.Eq(*filter.TenantID))
}
if filter.UserID != nil && *filter.UserID > 0 {
conds = append(conds, tbl.UserID.Eq(*filter.UserID))
}
if filter.Type != nil && *filter.Type != "" {
conds = append(conds, tbl.Type.Eq(*filter.Type))
}
if filter.Status != nil && *filter.Status != "" {
conds = append(conds, tbl.Status.Eq(*filter.Status))
}
if filter.CreatedAtFrom != nil {
conds = append(conds, tbl.CreatedAt.Gte(*filter.CreatedAtFrom))
}
if filter.CreatedAtTo != nil {
conds = append(conds, tbl.CreatedAt.Lte(*filter.CreatedAtTo))
}
if filter.PaidAtFrom != nil {
conds = append(conds, tbl.PaidAt.Gte(*filter.PaidAtFrom))
}
if filter.PaidAtTo != nil {
conds = append(conds, tbl.PaidAt.Lte(*filter.PaidAtTo))
}
if filter.AmountPaidMin != nil {
conds = append(conds, tbl.AmountPaid.Gte(*filter.AmountPaidMin))
}
if filter.AmountPaidMax != nil {
conds = append(conds, tbl.AmountPaid.Lte(*filter.AmountPaidMax))
}
// 买家用户名关键字。
if username := filter.UsernameTrimmed(); username != "" {
uTbl, _ := models.UserQuery.QueryContext(ctx)
query = query.LeftJoin(uTbl, uTbl.ID.EqCol(tbl.UserID))
conds = append(conds, uTbl.Username.Like(database.WrapLike(username)))
}
// 租户 code/name 关键字。
tenantCode := filter.TenantCodeTrimmed()
tenantName := filter.TenantNameTrimmed()
if tenantCode != "" || tenantName != "" {
tTbl, _ := models.TenantQuery.QueryContext(ctx)
query = query.LeftJoin(tTbl, tTbl.ID.EqCol(tbl.TenantID))
if tenantCode != "" {
conds = append(conds, tTbl.Code.Like(database.WrapLike(tenantCode)))
}
if tenantName != "" {
conds = append(conds, tTbl.Name.Like(database.WrapLike(tenantName)))
}
}
// 内容过滤orders 与 order_items 一对多,需要 group by
needItemJoin := (filter.ContentID != nil && *filter.ContentID > 0) || filter.ContentTitleTrimmed() != ""
if needItemJoin {
oiTbl, _ := models.OrderItemQuery.QueryContext(ctx)
query = query.LeftJoin(oiTbl, oiTbl.OrderID.EqCol(tbl.ID))
if filter.ContentID != nil && *filter.ContentID > 0 {
conds = append(conds, oiTbl.ContentID.Eq(*filter.ContentID))
}
if title := filter.ContentTitleTrimmed(); title != "" {
cTbl, _ := models.ContentQuery.QueryContext(ctx)
query = query.LeftJoin(cTbl, cTbl.ID.EqCol(oiTbl.ContentID))
conds = append(conds, cTbl.Title.Like(database.WrapLike(title)))
}
query = query.Group(tbl.ID)
}
// 排序白名单:避免把任意字符串拼进 SQL 导致注入或慢查询。
orderBys := make([]field.Expr, 0, 6)
allowedAsc := map[string]field.Expr{
"id": tbl.ID.Asc(),
"tenant_id": tbl.TenantID.Asc(),
"user_id": tbl.UserID.Asc(),
"status": tbl.Status.Asc(),
"created_at": tbl.CreatedAt.Asc(),
"paid_at": tbl.PaidAt.Asc(),
"amount_paid": tbl.AmountPaid.Asc(),
}
allowedDesc := map[string]field.Expr{
"id": tbl.ID.Desc(),
"tenant_id": tbl.TenantID.Desc(),
"user_id": tbl.UserID.Desc(),
"status": tbl.Status.Desc(),
"created_at": tbl.CreatedAt.Desc(),
"paid_at": tbl.PaidAt.Desc(),
"amount_paid": tbl.AmountPaid.Desc(),
}
for _, f := range filter.AscFields() {
f = strings.TrimSpace(f)
if f == "" {
continue
}
if ob, ok := allowedAsc[f]; ok {
orderBys = append(orderBys, ob)
}
}
for _, f := range filter.DescFields() {
f = strings.TrimSpace(f)
if f == "" {
continue
}
if ob, ok := allowedDesc[f]; ok {
orderBys = append(orderBys, ob)
}
}
if len(orderBys) == 0 {
orderBys = append(orderBys, tbl.ID.Desc())
} else {
orderBys = append(orderBys, tbl.ID.Desc())
}
orders, total, err := query.Where(conds...).Order(orderBys...).FindByPage(int(filter.Offset()), int(filter.Limit))
if err != nil {
return nil, err
}
tenantIDs := make([]int64, 0, len(orders))
userIDs := make([]int64, 0, len(orders))
for _, o := range orders {
if o == nil {
continue
}
if o.TenantID > 0 {
tenantIDs = append(tenantIDs, o.TenantID)
}
if o.UserID > 0 {
userIDs = append(userIDs, o.UserID)
}
}
tenantIDs = lo.Uniq(tenantIDs)
userIDs = lo.Uniq(userIDs)
tenantMap := make(map[int64]*models.Tenant, len(tenantIDs))
if len(tenantIDs) > 0 {
tTbl, tQuery := models.TenantQuery.QueryContext(ctx)
tenants, err := tQuery.Where(tTbl.ID.In(tenantIDs...)).Find()
if err != nil {
return nil, err
}
for _, te := range tenants {
if te == nil {
continue
}
tenantMap[te.ID] = te
}
}
userMap := make(map[int64]*models.User, len(userIDs))
if len(userIDs) > 0 {
uTbl, uQuery := models.UserQuery.QueryContext(ctx)
users, err := uQuery.Where(uTbl.ID.In(userIDs...)).Find()
if err != nil {
return nil, err
}
for _, u := range users {
if u == nil {
continue
}
userMap[u.ID] = u
}
}
items := lo.Map(orders, func(o *models.Order, _ int) *superdto.SuperOrderItem {
if o == nil {
return &superdto.SuperOrderItem{}
}
var tenantLite *superdto.OrderTenantLite
if te := tenantMap[o.TenantID]; te != nil {
tenantLite = &superdto.OrderTenantLite{ID: te.ID, Code: te.Code, Name: te.Name}
}
var buyerLite *superdto.OrderBuyerLite
if u := userMap[o.UserID]; u != nil {
buyerLite = &superdto.OrderBuyerLite{ID: u.ID, Username: u.Username}
}
return &superdto.SuperOrderItem{
ID: o.ID,
Tenant: tenantLite,
Buyer: buyerLite,
Type: o.Type,
Status: o.Status,
StatusDescription: o.Status.Description(),
Currency: o.Currency,
AmountOriginal: o.AmountOriginal,
AmountDiscount: o.AmountDiscount,
AmountPaid: o.AmountPaid,
PaidAt: o.PaidAt,
RefundedAt: o.RefundedAt,
CreatedAt: o.CreatedAt,
UpdatedAt: o.UpdatedAt,
}
})
return &requests.Pager{
Pagination: filter.Pagination,
Total: total,
Items: items,
}, nil
}
// SuperOrderDetail 平台侧订单详情(跨租户)。
func (s *order) SuperOrderDetail(ctx context.Context, orderID int64) (*superdto.SuperOrderDetail, error) {
if orderID <= 0 {
return nil, errorx.ErrInvalidParameter.WithMsg("order_id must be > 0")
}
tbl, query := models.OrderQuery.QueryContext(ctx)
orderModel, err := query.Preload(tbl.Items).Where(tbl.ID.Eq(orderID)).First()
if err != nil {
return nil, err
}
var tenantLite *superdto.OrderTenantLite
if orderModel.TenantID > 0 {
tTbl, tQuery := models.TenantQuery.QueryContext(ctx)
tenantModel, err := tQuery.Where(tTbl.ID.Eq(orderModel.TenantID)).First()
if err != nil {
return nil, err
}
tenantLite = &superdto.OrderTenantLite{ID: tenantModel.ID, Code: tenantModel.Code, Name: tenantModel.Name}
}
var buyerLite *superdto.OrderBuyerLite
if orderModel.UserID > 0 {
uTbl, uQuery := models.UserQuery.QueryContext(ctx)
userModel, err := uQuery.Where(uTbl.ID.Eq(orderModel.UserID)).First()
if err != nil {
return nil, err
}
buyerLite = &superdto.OrderBuyerLite{ID: userModel.ID, Username: userModel.Username}
}
return &superdto.SuperOrderDetail{
Order: orderModel,
Tenant: tenantLite,
Buyer: buyerLite,
}, nil
}
// SuperRefundOrder 平台侧发起退款(跨租户)。
func (s *order) SuperRefundOrder(
ctx context.Context,
operatorUserID, orderID int64,
force bool,
reason, idempotencyKey string,
now time.Time,
) (*models.Order, error) {
if operatorUserID <= 0 || orderID <= 0 {
return nil, errorx.ErrInvalidParameter.WithMsg("operator_user_id/order_id must be > 0")
}
tbl, query := models.OrderQuery.QueryContext(ctx)
orderModel, err := query.Where(tbl.ID.Eq(orderID)).First()
if err != nil {
return nil, err
}
return s.AdminRefundOrder(ctx, orderModel.TenantID, operatorUserID, orderID, force, reason, idempotencyKey, now)
}
// PurchaseContentParams 定义“租户内使用余额购买内容”的入参。
type PurchaseContentParams struct {
// TenantID 租户 ID多租户隔离范围
TenantID int64
// UserID 购买者用户 ID。
UserID int64
// ContentID 内容 ID。
ContentID int64
// IdempotencyKey 幂等键:用于确保同一购买请求“至多处理一次”。
IdempotencyKey string
// Now 逻辑时间:用于 created_at/paid_at 与账本快照(可选,便于测试/一致性)。
Now time.Time
}
// PurchaseContentResult 为购买结果(幂等命中时返回已存在的订单/权益状态)。
type PurchaseContentResult struct {
// Order 订单记录(可能为 nil例如“已购买且无订单上下文”的快捷路径
Order *models.Order
// OrderItem 订单明细(本业务为单内容购买,通常只有 1 条)。
OrderItem *models.OrderItem
// Access 内容权益(购买完成后应为 active
Access *models.ContentAccess
// AmountPaid 实付金额单位CNY
AmountPaid int64
}
// order 提供订单域能力(购买、退款、查询等)。
//
// @provider
type order struct {
db *gorm.DB
ledger *ledger
job *provider_job.Job
}
// SuperStatistics 平台侧订单统计(不限定 tenant_id
func (s *order) SuperStatistics(ctx context.Context) (*superdto.OrderStatisticsResponse, error) {
tbl, query := models.OrderQuery.QueryContext(ctx)
var rows []*superdto.OrderStatisticsRow
err := query.Select(
tbl.Status,
tbl.ID.Count().As("count"),
tbl.AmountPaid.Sum().As("amount_paid_sum"),
).Group(tbl.Status).Scan(&rows)
if err != nil {
return nil, err
}
var totalCount int64
var totalAmountPaidSum int64
for _, row := range rows {
if row == nil {
continue
}
row.StatusDescription = row.Status.Description()
totalCount += row.Count
totalAmountPaidSum += row.AmountPaidSum
}
return &superdto.OrderStatisticsResponse{
TotalCount: totalCount,
TotalAmountPaidSum: totalAmountPaidSum,
ByStatus: rows,
}, nil
}
type ProcessRefundingOrderParams struct {
// TenantID 租户ID。
TenantID int64
// OrderID 订单ID。
OrderID int64
// OperatorUserID 退款操作人用户ID用于补齐订单审计字段
OperatorUserID int64
// Force 是否强制退款(用于补齐订单审计字段)。
Force bool
// Reason 退款原因(用于补齐订单审计字段)。
Reason string
// Now 逻辑时间(用于 refunded_at/updated_at
Now time.Time
}
func IsRefundJobNonRetryableError(err error) bool {
var appErr *errorx.AppError
if !errors.As(err, &appErr) {
return false
}
switch appErr.Code {
case errorx.CodeInvalidParameter,
errorx.CodeRecordNotFound,
errorx.CodeStatusConflict,
errorx.CodePreconditionFailed,
errorx.CodePermissionDenied:
return true
default:
return false
}
}
func (s *order) enqueueOrderRefundJob(args jobs_args.OrderRefundJob) error {
return s.job.Add(args)
}
// MyOrderPage 分页查询当前用户在租户内的订单。
func (s *order) MyOrderPage(
ctx context.Context,
tenantID, userID int64,
filter *dto.MyOrderListFilter,
) (*requests.Pager, error) {
if tenantID <= 0 || userID <= 0 {
return nil, errorx.ErrInvalidParameter.WithMsg("tenant_id/user_id must be > 0")
}
if filter == nil {
filter = &dto.MyOrderListFilter{}
}
logrus.WithFields(logrus.Fields{
"tenant_id": tenantID,
"user_id": userID,
"status": lo.FromPtr(filter.Status),
"content_id": lo.FromPtr(filter.ContentID),
}).Info("services.order.me.page")
filter.Pagination.Format()
tbl, query := models.OrderQuery.QueryContext(ctx)
query = query.Preload(tbl.Items)
conds := []gen.Condition{
tbl.TenantID.Eq(tenantID),
tbl.UserID.Eq(userID),
}
if filter.Status != nil {
conds = append(conds, tbl.Status.Eq(*filter.Status))
}
if filter.PaidAtFrom != nil {
conds = append(conds, tbl.PaidAt.Gte(*filter.PaidAtFrom))
}
if filter.PaidAtTo != nil {
conds = append(conds, tbl.PaidAt.Lte(*filter.PaidAtTo))
}
if filter.ContentID != nil && *filter.ContentID > 0 {
oiTbl, _ := models.OrderItemQuery.QueryContext(ctx)
query = query.LeftJoin(oiTbl, oiTbl.OrderID.EqCol(tbl.ID))
conds = append(conds, oiTbl.ContentID.Eq(*filter.ContentID))
query = query.Group(tbl.ID)
}
items, total, err := query.Where(conds...).Order(tbl.ID.Desc()).FindByPage(int(filter.Offset()), int(filter.Limit))
if err != nil {
return nil, err
}
return &requests.Pager{
Pagination: filter.Pagination,
Total: total,
Items: items,
}, nil
}
// MyOrderDetail 查询当前用户在租户内的订单详情。
func (s *order) MyOrderDetail(ctx context.Context, tenantID, userID, orderID int64) (*models.Order, error) {
if tenantID <= 0 || userID <= 0 || orderID <= 0 {
return nil, errorx.ErrInvalidParameter.WithMsg("tenant_id/user_id/order_id must be > 0")
}
logrus.WithFields(logrus.Fields{
"tenant_id": tenantID,
"user_id": userID,
"order_id": orderID,
}).Info("services.order.me.detail")
tbl, query := models.OrderQuery.QueryContext(ctx)
m, err := query.Preload(tbl.Items).Where(
tbl.TenantID.Eq(tenantID),
tbl.UserID.Eq(userID),
tbl.ID.Eq(orderID),
).First()
if err != nil {
return nil, err
}
return m, nil
}
// AdminOrderPage 租户管理员分页查询租户内订单。
func (s *order) AdminOrderPage(
ctx context.Context,
tenantID int64,
filter *dto.AdminOrderListFilter,
) (*requests.Pager, error) {
if tenantID <= 0 {
return nil, errorx.ErrInvalidParameter.WithMsg("tenant_id must be > 0")
}
if filter == nil {
filter = &dto.AdminOrderListFilter{}
}
logrus.WithFields(logrus.Fields{
"tenant_id": tenantID,
"user_id": lo.FromPtr(filter.UserID),
"username": filter.UsernameTrimmed(),
"content_id": lo.FromPtr(filter.ContentID),
"content_title": filter.ContentTitleTrimmed(),
"type": lo.FromPtr(filter.Type),
"status": lo.FromPtr(filter.Status),
"created_at_from": filter.CreatedAtFrom,
"created_at_to": filter.CreatedAtTo,
"paid_at_from": filter.PaidAtFrom,
"paid_at_to": filter.PaidAtTo,
"amount_paid_min": filter.AmountPaidMin,
"amount_paid_max": filter.AmountPaidMax,
}).Info("services.order.admin.page")
filter.Pagination.Format()
tbl, query := models.OrderQuery.QueryContext(ctx)
query = query.Preload(tbl.Items)
conds := []gen.Condition{tbl.TenantID.Eq(tenantID)}
if filter.UserID != nil {
conds = append(conds, tbl.UserID.Eq(*filter.UserID))
}
if filter.Type != nil {
conds = append(conds, tbl.Type.Eq(*filter.Type))
}
if filter.Status != nil {
conds = append(conds, tbl.Status.Eq(*filter.Status))
}
if filter.CreatedAtFrom != nil {
conds = append(conds, tbl.CreatedAt.Gte(*filter.CreatedAtFrom))
}
if filter.CreatedAtTo != nil {
conds = append(conds, tbl.CreatedAt.Lte(*filter.CreatedAtTo))
}
if filter.PaidAtFrom != nil {
conds = append(conds, tbl.PaidAt.Gte(*filter.PaidAtFrom))
}
if filter.PaidAtTo != nil {
conds = append(conds, tbl.PaidAt.Lte(*filter.PaidAtTo))
}
if filter.AmountPaidMin != nil {
conds = append(conds, tbl.AmountPaid.Gte(*filter.AmountPaidMin))
}
if filter.AmountPaidMax != nil {
conds = append(conds, tbl.AmountPaid.Lte(*filter.AmountPaidMax))
}
// 用户关键字:按 users.username 模糊匹配。
// 关键点orders.user_id 与 users.id 一对一,不会导致重复行,无需 group by。
if username := filter.UsernameTrimmed(); username != "" {
uTbl, _ := models.UserQuery.QueryContext(ctx)
query = query.LeftJoin(uTbl, uTbl.ID.EqCol(tbl.UserID))
conds = append(conds, uTbl.Username.Like(database.WrapLike(username)))
}
// 内容过滤:通过 order_items以及 contents关联查询。
// 关键点orders 与 order_items 一对多join 后必须 group by orders.id 以避免同一订单重复返回。
needItemJoin := (filter.ContentID != nil && *filter.ContentID > 0) || filter.ContentTitleTrimmed() != ""
if needItemJoin {
oiTbl, _ := models.OrderItemQuery.QueryContext(ctx)
query = query.LeftJoin(oiTbl, oiTbl.OrderID.EqCol(tbl.ID))
if filter.ContentID != nil && *filter.ContentID > 0 {
conds = append(conds, oiTbl.ContentID.Eq(*filter.ContentID))
}
if title := filter.ContentTitleTrimmed(); title != "" {
cTbl, _ := models.ContentQuery.QueryContext(ctx)
query = query.LeftJoin(cTbl, cTbl.ID.EqCol(oiTbl.ContentID))
conds = append(conds, cTbl.Title.Like(database.WrapLike(title)))
}
query = query.Group(tbl.ID)
}
// 排序白名单:避免把任意字符串拼进 SQL 导致注入或慢查询。
// 约定:只允许按以下字段排序;未指定时默认按 id desc。
orderBys := make([]field.Expr, 0, 4)
allowedAsc := map[string]field.Expr{
"id": tbl.ID.Asc(),
"created_at": tbl.CreatedAt.Asc(),
"paid_at": tbl.PaidAt.Asc(),
"amount_paid": tbl.AmountPaid.Asc(),
}
allowedDesc := map[string]field.Expr{
"id": tbl.ID.Desc(),
"created_at": tbl.CreatedAt.Desc(),
"paid_at": tbl.PaidAt.Desc(),
"amount_paid": tbl.AmountPaid.Desc(),
}
for _, f := range filter.AscFields() {
f = strings.TrimSpace(f)
if f == "" {
continue
}
if ob, ok := allowedAsc[f]; ok {
orderBys = append(orderBys, ob)
}
}
for _, f := range filter.DescFields() {
f = strings.TrimSpace(f)
if f == "" {
continue
}
if ob, ok := allowedDesc[f]; ok {
orderBys = append(orderBys, ob)
}
}
// 默认加上 id desc 作为稳定排序(尤其是 join + group 的场景)。
if len(orderBys) == 0 {
orderBys = append(orderBys, tbl.ID.Desc())
} else {
orderBys = append(orderBys, tbl.ID.Desc())
}
items, total, err := query.Where(conds...).Order(orderBys...).FindByPage(int(filter.Offset()), int(filter.Limit))
if err != nil {
return nil, err
}
return &requests.Pager{
Pagination: filter.Pagination,
Total: total,
Items: items,
}, nil
}
// AdminOrderDetail 租户管理员查询租户内订单详情。
func (s *order) AdminOrderDetail(ctx context.Context, tenantID, orderID int64) (*models.Order, error) {
if tenantID <= 0 || orderID <= 0 {
return nil, errorx.ErrInvalidParameter.WithMsg("tenant_id/order_id must be > 0")
}
logrus.WithFields(logrus.Fields{
"tenant_id": tenantID,
"order_id": orderID,
}).Info("services.order.admin.detail")
tbl, query := models.OrderQuery.QueryContext(ctx)
m, err := query.Preload(tbl.Items).Where(tbl.TenantID.Eq(tenantID), tbl.ID.Eq(orderID)).First()
if err != nil {
return nil, err
}
return m, nil
}
// AdminRefundOrder 发起已支付订单退款(支持强制退款)。
//
// 语义:
// - 该方法只负责将订单从 paid 推进到 refunding并入队异步退款任务
// - 退款入账与权益回收由 job/worker 异步完成(见 ProcessRefundingOrder
func (s *order) AdminRefundOrder(
ctx context.Context,
tenantID, operatorUserID, orderID int64,
force bool,
reason, idempotencyKey string,
now time.Time,
) (*models.Order, error) {
if tenantID <= 0 || operatorUserID <= 0 || orderID <= 0 {
return nil, errorx.ErrInvalidParameter.WithMsg("tenant_id/operator_user_id/order_id must be > 0")
}
if now.IsZero() {
now = time.Now()
}
logrus.WithFields(logrus.Fields{
"tenant_id": tenantID,
"operator_user_id": operatorUserID,
"order_id": orderID,
"force": force,
"idempotency_key": idempotencyKey,
}).Info("services.order.admin.refund")
var out *models.Order
err := s.db.WithContext(ctx).Transaction(func(tx *gorm.DB) error {
// 行锁锁住订单,避免并发退款/重复退款导致状态错乱。
var orderModel models.Order
if err := tx.
Clauses(clause.Locking{Strength: "UPDATE"}).
Preload("Items").
Where("tenant_id = ? AND id = ?", tenantID, orderID).
First(&orderModel).Error; err != nil {
return err
}
// 状态机:已退款/退款中直接幂等返回;仅允许已支付订单发起退款请求。
if orderModel.Status == consts.OrderStatusRefunded {
out = &orderModel
return nil
}
if orderModel.Status == consts.OrderStatusRefunding {
out = &orderModel
return nil
}
// 允许从 failed 重新发起退款:失败状态表示“上一次异步退款未完成/被标记失败”,可由管理员重试推进到 refunding。
if orderModel.Status != consts.OrderStatusPaid && orderModel.Status != consts.OrderStatusFailed {
return errorx.ErrStatusConflict.WithMsg("订单非已支付状态,无法退款")
}
if orderModel.PaidAt.IsZero() {
return errorx.ErrPreconditionFailed.WithMsg("订单缺少 paid_at无法退款")
}
// 时间窗:默认 paid_at + 24hforce=true 可绕过。
if !force {
deadline := orderModel.PaidAt.Add(consts.DefaultOrderRefundWindow)
if now.After(deadline) {
return errorx.ErrPreconditionFailed.WithMsg("已超过默认退款时间窗")
}
}
// 将订单推进到 refunding并记录本次请求的审计字段实际退款逻辑由异步 job 完成。
if err := tx.Table(models.TableNameOrder).
Where("id = ?", orderModel.ID).
Updates(map[string]any{
"status": consts.OrderStatusRefunding,
"refund_forced": force,
"refund_operator_user_id": operatorUserID,
"refund_reason": reason,
"updated_at": now,
}).Error; err != nil {
return err
}
orderModel.Status = consts.OrderStatusRefunding
orderModel.RefundForced = force
orderModel.RefundOperatorUserID = operatorUserID
orderModel.RefundReason = reason
orderModel.UpdatedAt = now
out = &orderModel
return nil
})
if err != nil {
logrus.WithFields(logrus.Fields{
"tenant_id": tenantID,
"operator_user_id": operatorUserID,
"order_id": orderID,
"force": force,
"idempotency_key": idempotencyKey,
}).WithError(err).Warn("services.order.admin.refund.failed")
return nil, err
}
// refunding 状态需要确保异步任务已入队:入队失败则返回错误,调用方可重试(幂等)。
if out != nil && out.Status == consts.OrderStatusRefunding {
err := s.enqueueOrderRefundJob(jobs_args.OrderRefundJob{
TenantID: tenantID,
OrderID: out.ID,
OperatorUserID: operatorUserID,
Force: force,
Reason: reason,
})
if err != nil {
logrus.WithFields(logrus.Fields{
"tenant_id": tenantID,
"operator_user_id": operatorUserID,
"order_id": out.ID,
}).WithError(err).Warn("services.order.admin.refund.enqueue_failed")
return nil, err
}
logrus.WithFields(logrus.Fields{
"tenant_id": tenantID,
"operator_user_id": operatorUserID,
"order_id": out.ID,
}).Info("services.order.admin.refund.enqueued")
}
logrus.WithFields(logrus.Fields{
"tenant_id": tenantID,
"operator_user_id": operatorUserID,
"order_id": orderID,
"status": out.Status,
"refund_forced": out.RefundForced,
}).Info("services.order.admin.refund.ok")
return out, nil
}
// ProcessRefundingOrder 处理 refunding 订单:退余额、回收权益、推进到 refunded。
// 供异步 job/worker 调用;需保持幂等。
func (s *order) ProcessRefundingOrder(ctx context.Context, params *ProcessRefundingOrderParams) (*models.Order, error) {
if params == nil {
return nil, errorx.ErrInvalidParameter.WithMsg("params is required")
}
if params.TenantID <= 0 || params.OrderID <= 0 {
return nil, errorx.ErrInvalidParameter.WithMsg("tenant_id/order_id must be > 0")
}
if params.Now.IsZero() {
params.Now = time.Now()
}
logrus.WithFields(logrus.Fields{
"tenant_id": params.TenantID,
"order_id": params.OrderID,
"operator_user_id": params.OperatorUserID,
"force": params.Force,
}).Info("services.order.refund.process")
var out *models.Order
err := s.db.WithContext(ctx).Transaction(func(tx *gorm.DB) error {
var orderModel models.Order
if err := tx.
Clauses(clause.Locking{Strength: "UPDATE"}).
Preload("Items").
Where("tenant_id = ? AND id = ?", params.TenantID, params.OrderID).
First(&orderModel).Error; err != nil {
if errors.Is(err, gorm.ErrRecordNotFound) {
return errorx.ErrRecordNotFound.WithMsg("order not found")
}
return err
}
// 幂等:已退款/已失败直接返回。
if orderModel.Status == consts.OrderStatusRefunded || orderModel.Status == consts.OrderStatusFailed {
out = &orderModel
return nil
}
// 仅允许 refunding 状态进入处理链路paid->refunding 必须由接口层完成并记录审计字段。
if orderModel.Status != consts.OrderStatusRefunding {
// 不可重试:状态不符合预期,直接标记 failed避免 job 无限重试。
_ = tx.Table(models.TableNameOrder).
Where("id = ?", orderModel.ID).
Updates(map[string]any{
"status": consts.OrderStatusFailed,
"updated_at": params.Now,
}).Error
return errorx.ErrStatusConflict.WithMsg("order not in refunding status")
}
// 补齐审计字段:以订单字段为准;若为空则用 job 参数兜底(避免历史数据/异常路径导致缺失)。
operatorUserID := orderModel.RefundOperatorUserID
if operatorUserID == 0 {
operatorUserID = params.OperatorUserID
}
reason := orderModel.RefundReason
if strings.TrimSpace(reason) == "" {
reason = strings.TrimSpace(params.Reason)
}
force := orderModel.RefundForced
if !force {
force = params.Force
}
amount := orderModel.AmountPaid
refundKey := fmt.Sprintf("refund:%d", orderModel.ID)
// 先退余额(账本入账),后回收权益,最后推进订单终态:保证退款可对账且可追溯。
if amount > 0 {
if _, err := s.ledger.CreditRefundTx(ctx, tx, params.TenantID, operatorUserID, orderModel.UserID, orderModel.ID, amount, refundKey, reason, params.Now); err != nil {
return err
}
}
for _, item := range orderModel.Items {
if item == nil {
continue
}
if err := tx.Table(models.TableNameContentAccess).
Where("tenant_id = ? AND user_id = ? AND content_id = ?", params.TenantID, orderModel.UserID, item.ContentID).
Updates(map[string]any{
"status": consts.ContentAccessStatusRevoked,
"revoked_at": params.Now,
"updated_at": params.Now,
}).Error; err != nil {
return err
}
}
if err := tx.Table(models.TableNameOrder).
Where("id = ?", orderModel.ID).
Updates(map[string]any{
"status": consts.OrderStatusRefunded,
"refunded_at": params.Now,
"refund_forced": force,
"refund_operator_user_id": operatorUserID,
"refund_reason": reason,
"updated_at": params.Now,
}).Error; err != nil {
return err
}
orderModel.Status = consts.OrderStatusRefunded
orderModel.RefundedAt = params.Now
orderModel.RefundForced = force
orderModel.RefundOperatorUserID = operatorUserID
orderModel.RefundReason = reason
orderModel.UpdatedAt = params.Now
out = &orderModel
return nil
})
if err != nil {
// 不可重试错误由 worker 负责 JobCancel这里保持原始 error 以便判定。
logrus.WithFields(logrus.Fields{
"tenant_id": params.TenantID,
"order_id": params.OrderID,
}).WithError(err).Warn("services.order.refund.process.failed")
return nil, err
}
return out, nil
}
// MarkRefundFailed marks an order as failed during async refund processing.
// 仅用于 worker 在判定“不可重试”错误时落终态,避免订单长期停留在 refunding。
func (s *order) MarkRefundFailed(ctx context.Context, tenantID, orderID int64, now time.Time) error {
if tenantID <= 0 || orderID <= 0 {
return errorx.ErrInvalidParameter.WithMsg("tenant_id/order_id must be > 0")
}
if now.IsZero() {
now = time.Now()
}
return s.db.WithContext(ctx).Transaction(func(tx *gorm.DB) error {
var orderModel models.Order
if err := tx.
Clauses(clause.Locking{Strength: "UPDATE"}).
Where("tenant_id = ? AND id = ?", tenantID, orderID).
First(&orderModel).Error; err != nil {
return err
}
// 已退款/已失败都无需变更。
if orderModel.Status == consts.OrderStatusRefunded || orderModel.Status == consts.OrderStatusFailed {
return nil
}
return tx.Table(models.TableNameOrder).
Where("id = ?", orderModel.ID).
Updates(map[string]any{
"status": consts.OrderStatusFailed,
"updated_at": now,
}).Error
})
}
func (s *order) PurchaseContent(ctx context.Context, params *PurchaseContentParams) (*PurchaseContentResult, error) {
if params == nil {
return nil, errorx.ErrInvalidParameter.WithMsg("params is required")
}
if params.TenantID <= 0 || params.UserID <= 0 || params.ContentID <= 0 {
return nil, errorx.ErrInvalidParameter.WithMsg("tenant_id/user_id/content_id must be > 0")
}
now := params.Now
if now.IsZero() {
now = time.Now()
}
logrus.WithFields(logrus.Fields{
"tenant_id": params.TenantID,
"user_id": params.UserID,
"content_id": params.ContentID,
"idempotency_key": params.IdempotencyKey,
}).Info("services.order.purchase_content")
var out PurchaseContentResult
// 幂等购买采用“三段式”流程,保证一致性:
// 1) 先独立事务冻结余额(预留资金);
// 2) 再用单事务写订单+扣款+授予权益;
// 3) 若第 2 步失败,则解冻并写入回滚标记,保证重试稳定返回“失败+已回滚”。
if params.IdempotencyKey != "" {
freezeKey := fmt.Sprintf("%s:freeze", params.IdempotencyKey)
debitKey := fmt.Sprintf("%s:debit", params.IdempotencyKey)
rollbackKey := fmt.Sprintf("%s:rollback", params.IdempotencyKey)
// 1) 若该幂等键已生成订单,则直接返回订单与权益(幂等命中)。
{
tbl, query := models.OrderQuery.QueryContext(ctx)
existing, err := query.Preload(tbl.Items).Where(
tbl.TenantID.Eq(params.TenantID),
tbl.UserID.Eq(params.UserID),
tbl.IdempotencyKey.Eq(params.IdempotencyKey),
).First()
if err == nil {
out.Order = existing
if len(existing.Items) > 0 {
out.OrderItem = existing.Items[0]
}
out.AmountPaid = existing.AmountPaid
if out.OrderItem != nil {
aTbl, aQuery := models.ContentAccessQuery.QueryContext(ctx)
access, err := aQuery.Where(
aTbl.TenantID.Eq(params.TenantID),
aTbl.UserID.Eq(params.UserID),
aTbl.ContentID.Eq(out.OrderItem.ContentID),
).First()
if err == nil {
out.Access = access
}
}
return &out, nil
}
if err != nil && !errors.Is(err, gorm.ErrRecordNotFound) {
return nil, err
}
}
// 2) 若历史已回滚过该幂等请求,则稳定返回“失败+已回滚”(避免重复冻结/重复扣款)。
{
tbl, query := models.TenantLedgerQuery.QueryContext(ctx)
_, err := query.Where(
tbl.TenantID.Eq(params.TenantID),
tbl.UserID.Eq(params.UserID),
tbl.IdempotencyKey.Eq(rollbackKey),
).First()
if err == nil {
return nil, errorx.ErrOperationFailed.WithMsg("失败+已回滚")
}
if err != nil && !errors.Is(err, gorm.ErrRecordNotFound) {
return nil, err
}
}
// 查询内容与价格:放在事务外简化逻辑;后续以订单事务为准。
var content models.Content
{
tbl, query := models.ContentQuery.QueryContext(ctx)
m, err := query.Where(
tbl.TenantID.Eq(params.TenantID),
tbl.ID.Eq(params.ContentID),
tbl.DeletedAt.IsNull(),
).First()
if err != nil {
return nil, err
}
content = *m
}
if content.Status != consts.ContentStatusPublished {
return nil, errorx.ErrPreconditionFailed.WithMsg("content not published")
}
// 作者自购:直接授予权益(不走余额冻结/扣款)。
if content.UserID == params.UserID {
err := s.db.WithContext(ctx).Transaction(func(tx *gorm.DB) error {
if err := s.grantAccess(ctx, tx, params.TenantID, params.UserID, params.ContentID, 0, now); err != nil {
return err
}
var access models.ContentAccess
if err := tx.Table(models.TableNameContentAccess).
Where("tenant_id = ? AND user_id = ? AND content_id = ?", params.TenantID, params.UserID, params.ContentID).
First(&access).Error; err != nil {
return err
}
out.AmountPaid = 0
out.Access = &access
return nil
})
if err != nil {
return nil, err
}
return &out, nil
}
priceAmount := int64(0)
var price models.ContentPrice
{
tbl, query := models.ContentPriceQuery.QueryContext(ctx)
m, err := query.Where(
tbl.TenantID.Eq(params.TenantID),
tbl.ContentID.Eq(params.ContentID),
).First()
if err == nil {
price = *m
priceAmount = m.PriceAmount
} else if !errors.Is(err, gorm.ErrRecordNotFound) {
return nil, err
}
}
amountPaid := s.computeFinalPrice(priceAmount, &price, now)
out.AmountPaid = amountPaid
discountType := price.DiscountType
if discountType == "" {
discountType = consts.DiscountTypeNone
}
var discountStartAt *time.Time
if !price.DiscountStartAt.IsZero() {
t := price.DiscountStartAt
discountStartAt = &t
}
var discountEndAt *time.Time
if !price.DiscountEndAt.IsZero() {
t := price.DiscountEndAt
discountEndAt = &t
}
purchaseSnapshot := newOrderSnapshot(consts.OrderTypeContentPurchase, &fields.OrdersContentPurchaseSnapshot{
ContentID: content.ID,
ContentTitle: content.Title,
ContentUserID: content.UserID,
ContentVisibility: content.Visibility,
PreviewSeconds: content.PreviewSeconds,
PreviewDownloadable: content.PreviewDownloadable,
Currency: consts.CurrencyCNY,
PriceAmount: priceAmount,
DiscountType: discountType,
DiscountValue: price.DiscountValue,
DiscountStartAt: discountStartAt,
DiscountEndAt: discountEndAt,
AmountOriginal: priceAmount,
AmountDiscount: priceAmount - amountPaid,
AmountPaid: amountPaid,
PurchaseAt: now,
PurchaseIdempotency: params.IdempotencyKey,
})
itemSnapshot := types.NewJSONType(fields.OrderItemsSnapshot{
ContentID: content.ID,
ContentTitle: content.Title,
ContentUserID: content.UserID,
AmountPaid: amountPaid,
})
// 免费内容:无需冻结,保持单事务写订单+权益。
if amountPaid == 0 {
err := s.db.WithContext(ctx).Transaction(func(tx *gorm.DB) error {
orderModel := &models.Order{
TenantID: params.TenantID,
UserID: params.UserID,
Type: consts.OrderTypeContentPurchase,
Status: consts.OrderStatusPaid,
Currency: consts.CurrencyCNY,
AmountOriginal: priceAmount,
AmountDiscount: priceAmount - amountPaid,
AmountPaid: amountPaid,
Snapshot: purchaseSnapshot,
IdempotencyKey: params.IdempotencyKey,
PaidAt: now,
CreatedAt: now,
UpdatedAt: now,
}
if err := tx.Create(orderModel).Error; err != nil {
return err
}
item := &models.OrderItem{
TenantID: params.TenantID,
UserID: params.UserID,
OrderID: orderModel.ID,
ContentID: params.ContentID,
ContentUserID: content.UserID,
AmountPaid: amountPaid,
Snapshot: itemSnapshot,
CreatedAt: now,
UpdatedAt: now,
}
if err := tx.Create(item).Error; err != nil {
return err
}
if err := s.grantAccess(ctx, tx, params.TenantID, params.UserID, params.ContentID, orderModel.ID, now); err != nil {
return err
}
var access models.ContentAccess
if err := tx.Table(models.TableNameContentAccess).
Where("tenant_id = ? AND user_id = ? AND content_id = ?", params.TenantID, params.UserID, params.ContentID).
First(&access).Error; err != nil {
return err
}
out.Order = orderModel
out.OrderItem = item
out.Access = &access
return nil
})
if err != nil {
return nil, pkgerrors.Wrap(err, "purchase content failed")
}
return &out, nil
}
// 3) 独立事务冻结余额:便于后续在订单事务失败时做补偿解冻。
if err := s.db.WithContext(ctx).Transaction(func(tx *gorm.DB) error {
_, err := s.ledger.FreezeTx(ctx, tx, params.TenantID, params.UserID, params.UserID, 0, amountPaid, freezeKey, "purchase freeze", now)
return err
}); err != nil {
return nil, pkgerrors.Wrap(err, "purchase freeze failed")
}
// 4) 单事务完成:落订单 → 账本扣款(消耗冻结)→ 更新订单 paid → 授予权益。
if err := s.db.WithContext(ctx).Transaction(func(tx *gorm.DB) error {
orderModel := &models.Order{
TenantID: params.TenantID,
UserID: params.UserID,
Type: consts.OrderTypeContentPurchase,
Status: consts.OrderStatusCreated,
Currency: consts.CurrencyCNY,
AmountOriginal: priceAmount,
AmountDiscount: priceAmount - amountPaid,
AmountPaid: amountPaid,
Snapshot: purchaseSnapshot,
IdempotencyKey: params.IdempotencyKey,
CreatedAt: now,
UpdatedAt: now,
}
if err := tx.Create(orderModel).Error; err != nil {
return err
}
item := &models.OrderItem{
TenantID: params.TenantID,
UserID: params.UserID,
OrderID: orderModel.ID,
ContentID: params.ContentID,
ContentUserID: content.UserID,
AmountPaid: amountPaid,
Snapshot: itemSnapshot,
CreatedAt: now,
UpdatedAt: now,
}
if err := tx.Create(item).Error; err != nil {
return err
}
if _, err := s.ledger.DebitPurchaseTx(ctx, tx, params.TenantID, params.UserID, params.UserID, orderModel.ID, amountPaid, debitKey, "purchase debit", now); err != nil {
return err
}
if err := tx.Model(&models.Order{}).
Where("id = ?", orderModel.ID).
Updates(map[string]any{
"status": consts.OrderStatusPaid,
"paid_at": now,
"updated_at": now,
}).Error; err != nil {
return err
}
// 关键点:上面是 DB 更新;这里同步更新内存对象,避免返回给调用方的状态仍为 created。
orderModel.Status = consts.OrderStatusPaid
orderModel.PaidAt = now
orderModel.UpdatedAt = now
if err := s.grantAccess(ctx, tx, params.TenantID, params.UserID, params.ContentID, orderModel.ID, now); err != nil {
return err
}
var access models.ContentAccess
if err := tx.Table(models.TableNameContentAccess).
Where("tenant_id = ? AND user_id = ? AND content_id = ?", params.TenantID, params.UserID, params.ContentID).
First(&access).Error; err != nil {
return err
}
out.Order = orderModel
out.OrderItem = item
out.Access = &access
return nil
}); err != nil {
// 5) 补偿:订单事务失败时,必须解冻,并写入回滚标记,保证后续幂等重试稳定返回失败。
_ = s.db.WithContext(ctx).Transaction(func(tx *gorm.DB) error {
_, e1 := s.ledger.UnfreezeTx(
ctx,
tx,
params.TenantID,
params.UserID, // operator_user_id购买者本人下单链路中的补偿动作
params.UserID,
0,
amountPaid,
rollbackKey,
"purchase rollback",
now,
)
return e1
})
logrus.WithFields(logrus.Fields{
"tenant_id": params.TenantID,
"user_id": params.UserID,
"content_id": params.ContentID,
"idempotency_key": params.IdempotencyKey,
}).WithError(err).Warn("services.order.purchase_content.rollback")
return nil, errorx.ErrOperationFailed.WithMsg("失败+已回滚")
}
logrus.WithFields(logrus.Fields{
"tenant_id": params.TenantID,
"user_id": params.UserID,
"content_id": params.ContentID,
"order_id": loID(out.Order),
"amount_paid": out.AmountPaid,
"idempotency_key": params.IdempotencyKey,
}).Info("services.order.purchase_content.ok")
return &out, nil
}
// 非幂等请求走“单事务”旧流程:冻结 + 落单 + 扣款 + 授权全部在一个事务内完成(失败整体回滚)。
err := s.db.WithContext(ctx).Transaction(func(tx *gorm.DB) error {
var content models.Content
if err := tx.
Where("tenant_id = ? AND id = ? AND deleted_at IS NULL", params.TenantID, params.ContentID).
First(&content).Error; err != nil {
if errors.Is(err, gorm.ErrRecordNotFound) {
return errorx.ErrRecordNotFound.WithMsg("content not found")
}
return err
}
if content.Status != consts.ContentStatusPublished {
return errorx.ErrPreconditionFailed.WithMsg("content not published")
}
var accessExisting models.ContentAccess
if err := tx.
Where("tenant_id = ? AND user_id = ? AND content_id = ?", params.TenantID, params.UserID, params.ContentID).
First(&accessExisting).Error; err == nil {
if accessExisting.Status == consts.ContentAccessStatusActive {
out.Access = &accessExisting
return nil
}
} else if !errors.Is(err, gorm.ErrRecordNotFound) {
return err
}
var price models.ContentPrice
priceAmount := int64(0)
if err := tx.Where("tenant_id = ? AND content_id = ?", params.TenantID, params.ContentID).First(&price).Error; err == nil {
priceAmount = price.PriceAmount
} else if !errors.Is(err, gorm.ErrRecordNotFound) {
return err
}
amountPaid := s.computeFinalPrice(priceAmount, &price, now)
out.AmountPaid = amountPaid
discountType := price.DiscountType
if discountType == "" {
discountType = consts.DiscountTypeNone
}
var discountStartAt *time.Time
if !price.DiscountStartAt.IsZero() {
t := price.DiscountStartAt
discountStartAt = &t
}
var discountEndAt *time.Time
if !price.DiscountEndAt.IsZero() {
t := price.DiscountEndAt
discountEndAt = &t
}
purchaseSnapshot := newOrderSnapshot(consts.OrderTypeContentPurchase, &fields.OrdersContentPurchaseSnapshot{
ContentID: content.ID,
ContentTitle: content.Title,
ContentUserID: content.UserID,
ContentVisibility: content.Visibility,
PreviewSeconds: content.PreviewSeconds,
PreviewDownloadable: content.PreviewDownloadable,
Currency: consts.CurrencyCNY,
PriceAmount: priceAmount,
DiscountType: discountType,
DiscountValue: price.DiscountValue,
DiscountStartAt: discountStartAt,
DiscountEndAt: discountEndAt,
AmountOriginal: priceAmount,
AmountDiscount: priceAmount - amountPaid,
AmountPaid: amountPaid,
PurchaseAt: now,
})
itemSnapshot := types.NewJSONType(fields.OrderItemsSnapshot{
ContentID: content.ID,
ContentTitle: content.Title,
ContentUserID: content.UserID,
AmountPaid: amountPaid,
})
if amountPaid == 0 {
orderModel := &models.Order{
TenantID: params.TenantID,
UserID: params.UserID,
Type: consts.OrderTypeContentPurchase,
Status: consts.OrderStatusPaid,
Currency: consts.CurrencyCNY,
AmountOriginal: priceAmount,
AmountDiscount: priceAmount - amountPaid,
AmountPaid: amountPaid,
Snapshot: purchaseSnapshot,
IdempotencyKey: "",
PaidAt: now,
CreatedAt: now,
UpdatedAt: now,
}
if err := tx.Create(orderModel).Error; err != nil {
return err
}
item := &models.OrderItem{
TenantID: params.TenantID,
UserID: params.UserID,
OrderID: orderModel.ID,
ContentID: params.ContentID,
ContentUserID: content.UserID,
AmountPaid: amountPaid,
Snapshot: itemSnapshot,
CreatedAt: now,
UpdatedAt: now,
}
if err := tx.Create(item).Error; err != nil {
return err
}
if err := s.grantAccess(ctx, tx, params.TenantID, params.UserID, params.ContentID, orderModel.ID, now); err != nil {
return err
}
var access models.ContentAccess
if err := tx.Table(models.TableNameContentAccess).
Where("tenant_id = ? AND user_id = ? AND content_id = ?", params.TenantID, params.UserID, params.ContentID).
First(&access).Error; err != nil {
return err
}
out.Order = orderModel
out.OrderItem = item
out.Access = &access
return nil
}
orderModel := &models.Order{
TenantID: params.TenantID,
UserID: params.UserID,
Type: consts.OrderTypeContentPurchase,
Status: consts.OrderStatusCreated,
Currency: consts.CurrencyCNY,
AmountOriginal: priceAmount,
AmountDiscount: priceAmount - amountPaid,
AmountPaid: amountPaid,
Snapshot: purchaseSnapshot,
IdempotencyKey: "",
CreatedAt: now,
UpdatedAt: now,
}
if _, err := s.ledger.FreezeTx(ctx, tx, params.TenantID, params.UserID, params.UserID, 0, amountPaid, "", "purchase freeze", now); err != nil {
return err
}
if err := tx.Create(orderModel).Error; err != nil {
return err
}
item := &models.OrderItem{
TenantID: params.TenantID,
UserID: params.UserID,
OrderID: orderModel.ID,
ContentID: params.ContentID,
ContentUserID: content.UserID,
AmountPaid: amountPaid,
Snapshot: itemSnapshot,
CreatedAt: now,
UpdatedAt: now,
}
if err := tx.Create(item).Error; err != nil {
return err
}
if _, err := s.ledger.DebitPurchaseTx(ctx, tx, params.TenantID, params.UserID, params.UserID, orderModel.ID, amountPaid, "", "purchase debit", now); err != nil {
return err
}
if err := tx.Model(&models.Order{}).
Where("id = ?", orderModel.ID).
Updates(map[string]any{
"status": consts.OrderStatusPaid,
"paid_at": now,
"updated_at": now,
}).Error; err != nil {
return err
}
// 关键点:上面是 DB 更新;这里同步更新内存对象,避免返回给调用方的状态仍为 created。
orderModel.Status = consts.OrderStatusPaid
orderModel.PaidAt = now
orderModel.UpdatedAt = now
if err := s.grantAccess(ctx, tx, params.TenantID, params.UserID, params.ContentID, orderModel.ID, now); err != nil {
return err
}
var access models.ContentAccess
if err := tx.Table(models.TableNameContentAccess).
Where("tenant_id = ? AND user_id = ? AND content_id = ?", params.TenantID, params.UserID, params.ContentID).
First(&access).Error; err != nil {
return err
}
out.Order = orderModel
out.OrderItem = item
out.Access = &access
return nil
})
if err != nil {
logrus.WithFields(logrus.Fields{
"tenant_id": params.TenantID,
"user_id": params.UserID,
"content_id": params.ContentID,
"idempotency_key": params.IdempotencyKey,
}).WithError(err).Warn("services.order.purchase_content.failed")
return nil, pkgerrors.Wrap(err, "purchase content failed")
}
logrus.WithFields(logrus.Fields{
"tenant_id": params.TenantID,
"user_id": params.UserID,
"content_id": params.ContentID,
"order_id": loID(out.Order),
"amount_paid": out.AmountPaid,
"idempotency_key": params.IdempotencyKey,
}).Info("services.order.purchase_content.ok")
return &out, nil
}
func (s *order) computeFinalPrice(priceAmount int64, price *models.ContentPrice, now time.Time) int64 {
// 价格计算:按折扣策略与生效时间窗口计算最终实付金额(单位:分)。
if priceAmount <= 0 || price == nil {
return 0
}
discountType := price.DiscountType
if discountType == "" {
discountType = consts.DiscountTypeNone
}
if !price.DiscountStartAt.IsZero() && now.Before(price.DiscountStartAt) {
return priceAmount
}
if !price.DiscountEndAt.IsZero() && now.After(price.DiscountEndAt) {
return priceAmount
}
switch discountType {
case consts.DiscountTypePercent:
percent := price.DiscountValue
if percent <= 0 {
return priceAmount
}
if percent >= 100 {
return 0
}
return priceAmount * (100 - percent) / 100
case consts.DiscountTypeAmount:
amount := price.DiscountValue
if amount <= 0 {
return priceAmount
}
if amount >= priceAmount {
return 0
}
return priceAmount - amount
default:
return priceAmount
}
}
func (s *order) grantAccess(
ctx context.Context,
tx *gorm.DB,
tenantID, userID, contentID, orderID int64,
now time.Time,
) error {
// 权益写入策略:按 (tenant_id,user_id,content_id) upsert确保重复购买/重试时权益最终为 active。
insert := map[string]any{
"tenant_id": tenantID,
"user_id": userID,
"content_id": contentID,
"order_id": orderID,
"status": consts.ContentAccessStatusActive,
"revoked_at": nil,
"created_at": now,
"updated_at": now,
}
if err := tx.Table(models.TableNameContentAccess).
Clauses(clause.OnConflict{
Columns: []clause.Column{{Name: "tenant_id"}, {Name: "user_id"}, {Name: "content_id"}},
DoUpdates: clause.Assignments(map[string]any{
"order_id": orderID,
"status": consts.ContentAccessStatusActive,
"revoked_at": nil,
"updated_at": now,
}),
}).
Create(insert).Error; err != nil {
return err
}
return nil
}
func loID(m *models.Order) int64 {
if m == nil {
return 0
}
return m.ID
}