feat: 优化购买内容的事务处理逻辑,支持幂等性和回滚机制
This commit is contained in:
@@ -419,32 +419,268 @@ func (s *order) PurchaseContent(ctx context.Context, params *PurchaseContentPara
|
||||
|
||||
var out PurchaseContentResult
|
||||
|
||||
err := s.db.WithContext(ctx).Transaction(func(tx *gorm.DB) error {
|
||||
// If idempotency key is present, use a 3-step flow to ensure:
|
||||
// - freeze is committed first (reserve funds),
|
||||
// - order+debit are committed together,
|
||||
// - on debit failure, we unfreeze and persist a rollback marker so retries return "failed+rolled back".
|
||||
if params.IdempotencyKey != "" {
|
||||
var existing models.Order
|
||||
if err := tx.
|
||||
Preload("Items").
|
||||
Where("tenant_id = ? AND user_id = ? AND idempotency_key = ?", params.TenantID, params.UserID, params.IdempotencyKey).
|
||||
First(&existing).Error; err == nil {
|
||||
out.Order = &existing
|
||||
freezeKey := fmt.Sprintf("%s:freeze", params.IdempotencyKey)
|
||||
debitKey := fmt.Sprintf("%s:debit", params.IdempotencyKey)
|
||||
rollbackKey := fmt.Sprintf("%s:rollback", params.IdempotencyKey)
|
||||
|
||||
// 1) If we already have an order for this idempotency key, return it.
|
||||
{
|
||||
tbl, query := models.OrderQuery.QueryContext(ctx)
|
||||
existing, err := query.Preload(tbl.Items).Where(
|
||||
tbl.TenantID.Eq(params.TenantID),
|
||||
tbl.UserID.Eq(params.UserID),
|
||||
tbl.IdempotencyKey.Eq(params.IdempotencyKey),
|
||||
).First()
|
||||
if err == nil {
|
||||
out.Order = existing
|
||||
if len(existing.Items) > 0 {
|
||||
out.OrderItem = existing.Items[0]
|
||||
}
|
||||
if out.OrderItem != nil {
|
||||
var access models.ContentAccess
|
||||
if err := tx.
|
||||
Where("tenant_id = ? AND user_id = ? AND content_id = ?", params.TenantID, params.UserID, out.OrderItem.ContentID).
|
||||
First(&access).Error; err == nil {
|
||||
out.Access = &access
|
||||
}
|
||||
}
|
||||
out.AmountPaid = existing.AmountPaid
|
||||
return nil
|
||||
} else if !errors.Is(err, gorm.ErrRecordNotFound) {
|
||||
return err
|
||||
if out.OrderItem != nil {
|
||||
aTbl, aQuery := models.ContentAccessQuery.QueryContext(ctx)
|
||||
access, err := aQuery.Where(
|
||||
aTbl.TenantID.Eq(params.TenantID),
|
||||
aTbl.UserID.Eq(params.UserID),
|
||||
aTbl.ContentID.Eq(out.OrderItem.ContentID),
|
||||
).First()
|
||||
if err == nil {
|
||||
out.Access = access
|
||||
}
|
||||
}
|
||||
return &out, nil
|
||||
}
|
||||
if err != nil && !errors.Is(err, gorm.ErrRecordNotFound) {
|
||||
return nil, err
|
||||
}
|
||||
}
|
||||
|
||||
// 2) If we previously rolled back this purchase, return stable failure.
|
||||
{
|
||||
tbl, query := models.TenantLedgerQuery.QueryContext(ctx)
|
||||
_, err := query.Where(
|
||||
tbl.TenantID.Eq(params.TenantID),
|
||||
tbl.UserID.Eq(params.UserID),
|
||||
tbl.IdempotencyKey.Eq(rollbackKey),
|
||||
).First()
|
||||
if err == nil {
|
||||
return nil, errorx.ErrOperationFailed.WithMsg("失败+已回滚")
|
||||
}
|
||||
if err != nil && !errors.Is(err, gorm.ErrRecordNotFound) {
|
||||
return nil, err
|
||||
}
|
||||
}
|
||||
|
||||
// Load content + price outside tx for simplicity.
|
||||
var content models.Content
|
||||
{
|
||||
tbl, query := models.ContentQuery.QueryContext(ctx)
|
||||
m, err := query.Where(
|
||||
tbl.TenantID.Eq(params.TenantID),
|
||||
tbl.ID.Eq(params.ContentID),
|
||||
tbl.DeletedAt.IsNull(),
|
||||
).First()
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
content = *m
|
||||
}
|
||||
if content.Status != consts.ContentStatusPublished {
|
||||
return nil, errorx.ErrPreconditionFailed.WithMsg("content not published")
|
||||
}
|
||||
|
||||
// owner shortcut
|
||||
if content.UserID == params.UserID {
|
||||
err := s.db.WithContext(ctx).Transaction(func(tx *gorm.DB) error {
|
||||
if err := s.grantAccess(ctx, tx, params.TenantID, params.UserID, params.ContentID, 0, now); err != nil {
|
||||
return err
|
||||
}
|
||||
var access models.ContentAccess
|
||||
if err := tx.Where("tenant_id = ? AND user_id = ? AND content_id = ?", params.TenantID, params.UserID, params.ContentID).First(&access).Error; err != nil {
|
||||
return err
|
||||
}
|
||||
out.AmountPaid = 0
|
||||
out.Access = &access
|
||||
return nil
|
||||
})
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
return &out, nil
|
||||
}
|
||||
|
||||
priceAmount := int64(0)
|
||||
var price models.ContentPrice
|
||||
{
|
||||
tbl, query := models.ContentPriceQuery.QueryContext(ctx)
|
||||
m, err := query.Where(
|
||||
tbl.TenantID.Eq(params.TenantID),
|
||||
tbl.ContentID.Eq(params.ContentID),
|
||||
).First()
|
||||
if err == nil {
|
||||
price = *m
|
||||
priceAmount = m.PriceAmount
|
||||
} else if !errors.Is(err, gorm.ErrRecordNotFound) {
|
||||
return nil, err
|
||||
}
|
||||
}
|
||||
|
||||
amountPaid := s.computeFinalPrice(priceAmount, &price, now)
|
||||
out.AmountPaid = amountPaid
|
||||
|
||||
// free path: no freeze needed; keep single tx.
|
||||
if amountPaid == 0 {
|
||||
err := s.db.WithContext(ctx).Transaction(func(tx *gorm.DB) error {
|
||||
orderModel := &models.Order{
|
||||
TenantID: params.TenantID,
|
||||
UserID: params.UserID,
|
||||
Type: consts.OrderTypeContentPurchase,
|
||||
Status: consts.OrderStatusPaid,
|
||||
Currency: consts.CurrencyCNY,
|
||||
AmountOriginal: priceAmount,
|
||||
AmountDiscount: priceAmount - amountPaid,
|
||||
AmountPaid: amountPaid,
|
||||
Snapshot: types.JSON([]byte("{}")),
|
||||
IdempotencyKey: params.IdempotencyKey,
|
||||
PaidAt: now,
|
||||
CreatedAt: now,
|
||||
UpdatedAt: now,
|
||||
}
|
||||
if err := tx.Create(orderModel).Error; err != nil {
|
||||
return err
|
||||
}
|
||||
item := &models.OrderItem{
|
||||
TenantID: params.TenantID,
|
||||
UserID: params.UserID,
|
||||
OrderID: orderModel.ID,
|
||||
ContentID: params.ContentID,
|
||||
ContentUserID: content.UserID,
|
||||
AmountPaid: amountPaid,
|
||||
Snapshot: types.JSON([]byte("{}")),
|
||||
CreatedAt: now,
|
||||
UpdatedAt: now,
|
||||
}
|
||||
if err := tx.Create(item).Error; err != nil {
|
||||
return err
|
||||
}
|
||||
if err := s.grantAccess(ctx, tx, params.TenantID, params.UserID, params.ContentID, orderModel.ID, now); err != nil {
|
||||
return err
|
||||
}
|
||||
var access models.ContentAccess
|
||||
if err := tx.Where("tenant_id = ? AND user_id = ? AND content_id = ?", params.TenantID, params.UserID, params.ContentID).First(&access).Error; err != nil {
|
||||
return err
|
||||
}
|
||||
out.Order = orderModel
|
||||
out.OrderItem = item
|
||||
out.Access = &access
|
||||
return nil
|
||||
})
|
||||
if err != nil {
|
||||
return nil, pkgerrors.Wrap(err, "purchase content failed")
|
||||
}
|
||||
return &out, nil
|
||||
}
|
||||
|
||||
// 3) Freeze in its own transaction so we can compensate later.
|
||||
if err := s.db.WithContext(ctx).Transaction(func(tx *gorm.DB) error {
|
||||
_, err := s.ledger.FreezeTx(ctx, tx, params.TenantID, params.UserID, 0, amountPaid, freezeKey, "purchase freeze", now)
|
||||
return err
|
||||
}); err != nil {
|
||||
return nil, pkgerrors.Wrap(err, "purchase freeze failed")
|
||||
}
|
||||
|
||||
// 4) Create order + debit + access in a single transaction.
|
||||
if err := s.db.WithContext(ctx).Transaction(func(tx *gorm.DB) error {
|
||||
orderModel := &models.Order{
|
||||
TenantID: params.TenantID,
|
||||
UserID: params.UserID,
|
||||
Type: consts.OrderTypeContentPurchase,
|
||||
Status: consts.OrderStatusCreated,
|
||||
Currency: consts.CurrencyCNY,
|
||||
AmountOriginal: priceAmount,
|
||||
AmountDiscount: priceAmount - amountPaid,
|
||||
AmountPaid: amountPaid,
|
||||
Snapshot: types.JSON([]byte("{}")),
|
||||
IdempotencyKey: params.IdempotencyKey,
|
||||
CreatedAt: now,
|
||||
UpdatedAt: now,
|
||||
}
|
||||
if err := tx.Create(orderModel).Error; err != nil {
|
||||
return err
|
||||
}
|
||||
item := &models.OrderItem{
|
||||
TenantID: params.TenantID,
|
||||
UserID: params.UserID,
|
||||
OrderID: orderModel.ID,
|
||||
ContentID: params.ContentID,
|
||||
ContentUserID: content.UserID,
|
||||
AmountPaid: amountPaid,
|
||||
Snapshot: types.JSON([]byte("{}")),
|
||||
CreatedAt: now,
|
||||
UpdatedAt: now,
|
||||
}
|
||||
if err := tx.Create(item).Error; err != nil {
|
||||
return err
|
||||
}
|
||||
if _, err := s.ledger.DebitPurchaseTx(ctx, tx, params.TenantID, params.UserID, orderModel.ID, amountPaid, debitKey, "purchase debit", now); err != nil {
|
||||
return err
|
||||
}
|
||||
if err := tx.Model(&models.Order{}).
|
||||
Where("id = ?", orderModel.ID).
|
||||
Updates(map[string]any{
|
||||
"status": consts.OrderStatusPaid,
|
||||
"paid_at": now,
|
||||
"updated_at": now,
|
||||
}).Error; err != nil {
|
||||
return err
|
||||
}
|
||||
if err := s.grantAccess(ctx, tx, params.TenantID, params.UserID, params.ContentID, orderModel.ID, now); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
var access models.ContentAccess
|
||||
if err := tx.Where("tenant_id = ? AND user_id = ? AND content_id = ?", params.TenantID, params.UserID, params.ContentID).First(&access).Error; err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
out.Order = orderModel
|
||||
out.OrderItem = item
|
||||
out.Access = &access
|
||||
return nil
|
||||
}); err != nil {
|
||||
// 5) Compensate: unfreeze and persist rollback marker.
|
||||
_ = s.db.WithContext(ctx).Transaction(func(tx *gorm.DB) error {
|
||||
_, e1 := s.ledger.UnfreezeTx(ctx, tx, params.TenantID, params.UserID, 0, amountPaid, rollbackKey, "purchase rollback", now)
|
||||
return e1
|
||||
})
|
||||
logrus.WithFields(logrus.Fields{
|
||||
"tenant_id": params.TenantID,
|
||||
"user_id": params.UserID,
|
||||
"content_id": params.ContentID,
|
||||
"idempotency_key": params.IdempotencyKey,
|
||||
}).WithError(err).Warn("services.order.purchase_content.rollback")
|
||||
return nil, errorx.ErrOperationFailed.WithMsg("失败+已回滚")
|
||||
}
|
||||
|
||||
logrus.WithFields(logrus.Fields{
|
||||
"tenant_id": params.TenantID,
|
||||
"user_id": params.UserID,
|
||||
"content_id": params.ContentID,
|
||||
"order_id": loID(out.Order),
|
||||
"amount_paid": out.AmountPaid,
|
||||
"idempotency_key": params.IdempotencyKey,
|
||||
}).Info("services.order.purchase_content.ok")
|
||||
|
||||
return &out, nil
|
||||
}
|
||||
|
||||
// Legacy atomic transaction path for requests without idempotency key.
|
||||
err := s.db.WithContext(ctx).Transaction(func(tx *gorm.DB) error {
|
||||
var content models.Content
|
||||
if err := tx.
|
||||
Where("tenant_id = ? AND id = ? AND deleted_at IS NULL", params.TenantID, params.ContentID).
|
||||
@@ -458,19 +694,6 @@ func (s *order) PurchaseContent(ctx context.Context, params *PurchaseContentPara
|
||||
return errorx.ErrPreconditionFailed.WithMsg("content not published")
|
||||
}
|
||||
|
||||
if content.UserID == params.UserID {
|
||||
out.AmountPaid = 0
|
||||
if err := s.grantAccess(ctx, tx, params.TenantID, params.UserID, params.ContentID, 0, now); err != nil {
|
||||
return err
|
||||
}
|
||||
var access models.ContentAccess
|
||||
if err := tx.Where("tenant_id = ? AND user_id = ? AND content_id = ?", params.TenantID, params.UserID, params.ContentID).First(&access).Error; err != nil {
|
||||
return err
|
||||
}
|
||||
out.Access = &access
|
||||
return nil
|
||||
}
|
||||
|
||||
var accessExisting models.ContentAccess
|
||||
if err := tx.
|
||||
Where("tenant_id = ? AND user_id = ? AND content_id = ?", params.TenantID, params.UserID, params.ContentID).
|
||||
@@ -505,7 +728,7 @@ func (s *order) PurchaseContent(ctx context.Context, params *PurchaseContentPara
|
||||
AmountDiscount: priceAmount - amountPaid,
|
||||
AmountPaid: amountPaid,
|
||||
Snapshot: types.JSON([]byte("{}")),
|
||||
IdempotencyKey: params.IdempotencyKey,
|
||||
IdempotencyKey: "",
|
||||
PaidAt: now,
|
||||
CreatedAt: now,
|
||||
UpdatedAt: now,
|
||||
@@ -550,16 +773,12 @@ func (s *order) PurchaseContent(ctx context.Context, params *PurchaseContentPara
|
||||
AmountDiscount: priceAmount - amountPaid,
|
||||
AmountPaid: amountPaid,
|
||||
Snapshot: types.JSON([]byte("{}")),
|
||||
IdempotencyKey: params.IdempotencyKey,
|
||||
IdempotencyKey: "",
|
||||
CreatedAt: now,
|
||||
UpdatedAt: now,
|
||||
}
|
||||
|
||||
freezeKey := fmt.Sprintf("%s:freeze", params.IdempotencyKey)
|
||||
if params.IdempotencyKey == "" {
|
||||
freezeKey = ""
|
||||
}
|
||||
if _, err := s.ledger.FreezeTx(ctx, tx, params.TenantID, params.UserID, 0, amountPaid, freezeKey, "purchase freeze", now); err != nil {
|
||||
if _, err := s.ledger.FreezeTx(ctx, tx, params.TenantID, params.UserID, 0, amountPaid, "", "purchase freeze", now); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
@@ -582,11 +801,7 @@ func (s *order) PurchaseContent(ctx context.Context, params *PurchaseContentPara
|
||||
return err
|
||||
}
|
||||
|
||||
debitKey := fmt.Sprintf("%s:debit", params.IdempotencyKey)
|
||||
if params.IdempotencyKey == "" {
|
||||
debitKey = ""
|
||||
}
|
||||
if _, err := s.ledger.DebitPurchaseTx(ctx, tx, params.TenantID, params.UserID, orderModel.ID, amountPaid, debitKey, "purchase debit", now); err != nil {
|
||||
if _, err := s.ledger.DebitPurchaseTx(ctx, tx, params.TenantID, params.UserID, orderModel.ID, amountPaid, "", "purchase debit", now); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
|
||||
Reference in New Issue
Block a user