fix: retry critical writes and allow super login
This commit is contained in:
@@ -62,6 +62,9 @@ func (m *Middlewares) Auth(ctx fiber.Ctx) error {
|
|||||||
}
|
}
|
||||||
|
|
||||||
func (m *Middlewares) SuperAuth(ctx fiber.Ctx) error {
|
func (m *Middlewares) SuperAuth(ctx fiber.Ctx) error {
|
||||||
|
if isSuperPublicRoute(ctx) {
|
||||||
|
return ctx.Next()
|
||||||
|
}
|
||||||
authHeader := ctx.Get("Authorization")
|
authHeader := ctx.Get("Authorization")
|
||||||
if authHeader == "" {
|
if authHeader == "" {
|
||||||
return errorx.ErrUnauthorized.WithMsg("Missing token")
|
return errorx.ErrUnauthorized.WithMsg("Missing token")
|
||||||
@@ -130,3 +133,14 @@ func isPublicRoute(ctx fiber.Ctx) bool {
|
|||||||
|
|
||||||
return false
|
return false
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func isSuperPublicRoute(ctx fiber.Ctx) bool {
|
||||||
|
path := ctx.Path()
|
||||||
|
method := ctx.Method()
|
||||||
|
|
||||||
|
if method == fiber.MethodPost && path == "/super/v1/auth/login" {
|
||||||
|
return true
|
||||||
|
}
|
||||||
|
|
||||||
|
return false
|
||||||
|
}
|
||||||
|
|||||||
@@ -5,6 +5,7 @@ import (
|
|||||||
"crypto/md5"
|
"crypto/md5"
|
||||||
"encoding/hex"
|
"encoding/hex"
|
||||||
"encoding/json"
|
"encoding/json"
|
||||||
|
"errors"
|
||||||
"io"
|
"io"
|
||||||
"mime/multipart"
|
"mime/multipart"
|
||||||
"os"
|
"os"
|
||||||
@@ -24,6 +25,7 @@ import (
|
|||||||
"quyun/v2/providers/storage"
|
"quyun/v2/providers/storage"
|
||||||
|
|
||||||
"github.com/google/uuid"
|
"github.com/google/uuid"
|
||||||
|
"github.com/jackc/pgconn"
|
||||||
"go.ipao.vip/gen/types"
|
"go.ipao.vip/gen/types"
|
||||||
)
|
)
|
||||||
|
|
||||||
@@ -453,3 +455,57 @@ func (s *common) GetAssetURL(objectKey string) string {
|
|||||||
url, _ := s.storage.SignURL("GET", objectKey, 1*time.Hour)
|
url, _ := s.storage.SignURL("GET", objectKey, 1*time.Hour)
|
||||||
return url
|
return url
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func retryCriticalWrite(ctx context.Context, fn func() error) error {
|
||||||
|
backoffs := []time.Duration{
|
||||||
|
50 * time.Millisecond,
|
||||||
|
120 * time.Millisecond,
|
||||||
|
250 * time.Millisecond,
|
||||||
|
}
|
||||||
|
var lastErr error
|
||||||
|
|
||||||
|
for attempt := 0; attempt <= len(backoffs); attempt++ {
|
||||||
|
if err := fn(); err != nil {
|
||||||
|
if !shouldRetryWrite(err) {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
lastErr = err
|
||||||
|
if attempt >= len(backoffs) {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
// 事务冲突/死锁等短暂错误,等待后重试。
|
||||||
|
if ctx != nil {
|
||||||
|
select {
|
||||||
|
case <-ctx.Done():
|
||||||
|
return errorx.ErrServiceTimeout.WithCause(ctx.Err())
|
||||||
|
case <-time.After(backoffs[attempt]):
|
||||||
|
}
|
||||||
|
} else {
|
||||||
|
time.Sleep(backoffs[attempt])
|
||||||
|
}
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
return lastErr
|
||||||
|
}
|
||||||
|
|
||||||
|
func shouldRetryWrite(err error) bool {
|
||||||
|
var appErr *errorx.AppError
|
||||||
|
if errors.As(err, &appErr) {
|
||||||
|
return false
|
||||||
|
}
|
||||||
|
return isTransientDBError(err)
|
||||||
|
}
|
||||||
|
|
||||||
|
func isTransientDBError(err error) bool {
|
||||||
|
var pgErr *pgconn.PgError
|
||||||
|
if errors.As(err, &pgErr) {
|
||||||
|
switch pgErr.Code {
|
||||||
|
case "40001", "40P01":
|
||||||
|
return true
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return false
|
||||||
|
}
|
||||||
|
|||||||
@@ -665,70 +665,73 @@ func (s *creator) ProcessRefund(ctx context.Context, userID, id int64, form *cre
|
|||||||
}
|
}
|
||||||
|
|
||||||
if form.Action == "accept" {
|
if form.Action == "accept" {
|
||||||
return models.Q.Transaction(func(tx *models.Query) error {
|
// 关键退款事务:遇到数据库冲突/死锁时短暂退避重试,避免退款卡住。
|
||||||
// 1. Deduct Creator Balance
|
return retryCriticalWrite(ctx, func() error {
|
||||||
// We credited Creator User Balance in Order.Pay. Now deduct it.
|
return models.Q.Transaction(func(tx *models.Query) error {
|
||||||
info, err := tx.User.WithContext(ctx).
|
// 1. Deduct Creator Balance
|
||||||
Where(tx.User.ID.Eq(uid), tx.User.Balance.Gte(o.AmountPaid)).
|
// We credited Creator User Balance in Order.Pay. Now deduct it.
|
||||||
Update(tx.User.Balance, gorm.Expr("balance - ?", o.AmountPaid))
|
info, err := tx.User.WithContext(ctx).
|
||||||
if err != nil {
|
Where(tx.User.ID.Eq(uid), tx.User.Balance.Gte(o.AmountPaid)).
|
||||||
return err
|
Update(tx.User.Balance, gorm.Expr("balance - ?", o.AmountPaid))
|
||||||
}
|
|
||||||
if info.RowsAffected == 0 {
|
|
||||||
return errorx.ErrQuotaExceeded.WithMsg("余额不足,无法退款")
|
|
||||||
}
|
|
||||||
|
|
||||||
// 2. Credit Buyer Balance
|
|
||||||
_, err = tx.User.WithContext(ctx).
|
|
||||||
Where(tx.User.ID.Eq(o.UserID)).
|
|
||||||
Update(tx.User.Balance, gorm.Expr("balance + ?", o.AmountPaid))
|
|
||||||
if err != nil {
|
|
||||||
return err
|
|
||||||
}
|
|
||||||
|
|
||||||
// 3. Update Order Status
|
|
||||||
_, err = tx.Order.WithContext(ctx).Where(tx.Order.ID.Eq(id)).Updates(&models.Order{
|
|
||||||
Status: consts.OrderStatusRefunded,
|
|
||||||
RefundedAt: time.Now(),
|
|
||||||
RefundOperatorUserID: uid,
|
|
||||||
RefundReason: form.Reason,
|
|
||||||
})
|
|
||||||
if err != nil {
|
|
||||||
return err
|
|
||||||
}
|
|
||||||
|
|
||||||
// 4. Revoke Content Access
|
|
||||||
// Fetch order items to get content IDs
|
|
||||||
items, _ := tx.OrderItem.WithContext(ctx).Where(tx.OrderItem.OrderID.Eq(o.ID)).Find()
|
|
||||||
contentIDs := make([]int64, len(items))
|
|
||||||
for i, item := range items {
|
|
||||||
contentIDs[i] = item.ContentID
|
|
||||||
}
|
|
||||||
if len(contentIDs) > 0 {
|
|
||||||
_, err = tx.ContentAccess.WithContext(ctx).
|
|
||||||
Where(tx.ContentAccess.UserID.Eq(o.UserID), tx.ContentAccess.ContentID.In(contentIDs...)).
|
|
||||||
UpdateSimple(tx.ContentAccess.Status.Value(consts.ContentAccessStatusRevoked))
|
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
}
|
if info.RowsAffected == 0 {
|
||||||
|
return errorx.ErrQuotaExceeded.WithMsg("余额不足,无法退款")
|
||||||
|
}
|
||||||
|
|
||||||
// 5. Create Tenant Ledger
|
// 2. Credit Buyer Balance
|
||||||
ledger := &models.TenantLedger{
|
_, err = tx.User.WithContext(ctx).
|
||||||
TenantID: tid,
|
Where(tx.User.ID.Eq(o.UserID)).
|
||||||
UserID: uid,
|
Update(tx.User.Balance, gorm.Expr("balance + ?", o.AmountPaid))
|
||||||
OrderID: o.ID,
|
if err != nil {
|
||||||
Type: consts.TenantLedgerTypeCreditRefund,
|
return err
|
||||||
Amount: o.AmountPaid,
|
}
|
||||||
Remark: "退款: " + form.Reason,
|
|
||||||
OperatorUserID: uid,
|
|
||||||
IdempotencyKey: uuid.NewString(),
|
|
||||||
}
|
|
||||||
if err := tx.TenantLedger.WithContext(ctx).Create(ledger); err != nil {
|
|
||||||
return err
|
|
||||||
}
|
|
||||||
|
|
||||||
return nil
|
// 3. Update Order Status
|
||||||
|
_, err = tx.Order.WithContext(ctx).Where(tx.Order.ID.Eq(id)).Updates(&models.Order{
|
||||||
|
Status: consts.OrderStatusRefunded,
|
||||||
|
RefundedAt: time.Now(),
|
||||||
|
RefundOperatorUserID: uid,
|
||||||
|
RefundReason: form.Reason,
|
||||||
|
})
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
// 4. Revoke Content Access
|
||||||
|
// Fetch order items to get content IDs
|
||||||
|
items, _ := tx.OrderItem.WithContext(ctx).Where(tx.OrderItem.OrderID.Eq(o.ID)).Find()
|
||||||
|
contentIDs := make([]int64, len(items))
|
||||||
|
for i, item := range items {
|
||||||
|
contentIDs[i] = item.ContentID
|
||||||
|
}
|
||||||
|
if len(contentIDs) > 0 {
|
||||||
|
_, err = tx.ContentAccess.WithContext(ctx).
|
||||||
|
Where(tx.ContentAccess.UserID.Eq(o.UserID), tx.ContentAccess.ContentID.In(contentIDs...)).
|
||||||
|
UpdateSimple(tx.ContentAccess.Status.Value(consts.ContentAccessStatusRevoked))
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// 5. Create Tenant Ledger
|
||||||
|
ledger := &models.TenantLedger{
|
||||||
|
TenantID: tid,
|
||||||
|
UserID: uid,
|
||||||
|
OrderID: o.ID,
|
||||||
|
Type: consts.TenantLedgerTypeCreditRefund,
|
||||||
|
Amount: o.AmountPaid,
|
||||||
|
Remark: "退款: " + form.Reason,
|
||||||
|
OperatorUserID: uid,
|
||||||
|
IdempotencyKey: uuid.NewString(),
|
||||||
|
}
|
||||||
|
if err := tx.TenantLedger.WithContext(ctx).Create(ledger); err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
return nil
|
||||||
|
})
|
||||||
})
|
})
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|||||||
@@ -247,106 +247,110 @@ func (s *order) payWithBalance(ctx context.Context, o *models.Order) (*transacti
|
|||||||
|
|
||||||
func (s *order) settleOrder(ctx context.Context, o *models.Order, method, externalID string) error {
|
func (s *order) settleOrder(ctx context.Context, o *models.Order, method, externalID string) error {
|
||||||
var tenantOwnerID int64
|
var tenantOwnerID int64
|
||||||
err := models.Q.Transaction(func(tx *models.Query) error {
|
// 关键结算事务:遇到数据库冲突/死锁时短暂退避重试,避免支付状态卡死。
|
||||||
// 1. Handle Balance Updates
|
err := retryCriticalWrite(ctx, func() error {
|
||||||
if o.Type == consts.OrderTypeRecharge {
|
tenantOwnerID = 0
|
||||||
// Income: Recharge (Credit User Balance)
|
return models.Q.Transaction(func(tx *models.Query) error {
|
||||||
_, err := tx.User.WithContext(ctx).
|
// 1. Handle Balance Updates
|
||||||
Where(tx.User.ID.Eq(o.UserID)).
|
if o.Type == consts.OrderTypeRecharge {
|
||||||
Update(tx.User.Balance, gorm.Expr("balance + ?", o.AmountPaid))
|
// Income: Recharge (Credit User Balance)
|
||||||
if err != nil {
|
_, err := tx.User.WithContext(ctx).
|
||||||
return err
|
Where(tx.User.ID.Eq(o.UserID)).
|
||||||
|
Update(tx.User.Balance, gorm.Expr("balance + ?", o.AmountPaid))
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
} else if method == "balance" {
|
||||||
|
// Expense: Purchase with Balance (Deduct User Balance)
|
||||||
|
info, err := tx.User.WithContext(ctx).
|
||||||
|
Where(tx.User.ID.Eq(o.UserID), tx.User.Balance.Gte(o.AmountPaid)).
|
||||||
|
Update(tx.User.Balance, gorm.Expr("balance - ?", o.AmountPaid))
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
if info.RowsAffected == 0 {
|
||||||
|
return errorx.ErrQuotaExceeded.WithMsg("余额不足")
|
||||||
|
}
|
||||||
}
|
}
|
||||||
} else if method == "balance" {
|
|
||||||
// Expense: Purchase with Balance (Deduct User Balance)
|
|
||||||
info, err := tx.User.WithContext(ctx).
|
|
||||||
Where(tx.User.ID.Eq(o.UserID), tx.User.Balance.Gte(o.AmountPaid)).
|
|
||||||
Update(tx.User.Balance, gorm.Expr("balance - ?", o.AmountPaid))
|
|
||||||
if err != nil {
|
|
||||||
return err
|
|
||||||
}
|
|
||||||
if info.RowsAffected == 0 {
|
|
||||||
return errorx.ErrQuotaExceeded.WithMsg("余额不足")
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
// 2. Update Order Status
|
// 2. Update Order Status
|
||||||
now := time.Now()
|
now := time.Now()
|
||||||
// snapshot := o.Snapshot // Preserve existing snapshot or update it with external ID
|
// snapshot := o.Snapshot // Preserve existing snapshot or update it with external ID
|
||||||
// TODO: Update snapshot with payment info
|
// TODO: Update snapshot with payment info
|
||||||
_, err := tx.Order.WithContext(ctx).Where(tx.Order.ID.Eq(o.ID)).Updates(&models.Order{
|
_, err := tx.Order.WithContext(ctx).Where(tx.Order.ID.Eq(o.ID)).Updates(&models.Order{
|
||||||
Status: consts.OrderStatusPaid,
|
Status: consts.OrderStatusPaid,
|
||||||
PaidAt: now,
|
PaidAt: now,
|
||||||
UpdatedAt: now,
|
UpdatedAt: now,
|
||||||
|
})
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
// 3. Grant Content Access
|
||||||
|
items, _ := tx.OrderItem.WithContext(ctx).Where(tx.OrderItem.OrderID.Eq(o.ID)).Find()
|
||||||
|
for _, item := range items {
|
||||||
|
// Check if access already exists (idempotency)
|
||||||
|
exists, _ := tx.ContentAccess.WithContext(ctx).
|
||||||
|
Where(tx.ContentAccess.UserID.Eq(o.UserID), tx.ContentAccess.ContentID.Eq(item.ContentID)).
|
||||||
|
Exists()
|
||||||
|
if exists {
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
|
||||||
|
access := &models.ContentAccess{
|
||||||
|
TenantID: item.TenantID,
|
||||||
|
UserID: o.UserID,
|
||||||
|
ContentID: item.ContentID,
|
||||||
|
OrderID: o.ID,
|
||||||
|
Status: consts.ContentAccessStatusActive,
|
||||||
|
}
|
||||||
|
if err := tx.ContentAccess.WithContext(ctx).Save(access); err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// 4. Create Tenant Ledger (Revenue) - Only for Content Purchase
|
||||||
|
if o.Type == consts.OrderTypeContentPurchase {
|
||||||
|
t, err := tx.Tenant.WithContext(ctx).Where(tx.Tenant.ID.Eq(o.TenantID)).First()
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
tenantOwnerID = t.UserID
|
||||||
|
|
||||||
|
// Calculate Commission
|
||||||
|
amount := o.AmountPaid
|
||||||
|
fee := int64(float64(amount) * 0.10)
|
||||||
|
creatorIncome := amount - fee
|
||||||
|
|
||||||
|
// Credit Tenant Owner Balance (Net Income)
|
||||||
|
_, err = tx.User.WithContext(ctx).
|
||||||
|
Where(tx.User.ID.Eq(tenantOwnerID)).
|
||||||
|
Update(tx.User.Balance, gorm.Expr("balance + ?", creatorIncome))
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
ledger := &models.TenantLedger{
|
||||||
|
TenantID: o.TenantID,
|
||||||
|
UserID: t.UserID, // Owner
|
||||||
|
OrderID: o.ID,
|
||||||
|
Type: consts.TenantLedgerTypeDebitPurchase, // Income from purchase
|
||||||
|
Amount: creatorIncome,
|
||||||
|
BalanceBefore: 0, // TODO
|
||||||
|
BalanceAfter: 0, // TODO
|
||||||
|
FrozenBefore: 0,
|
||||||
|
FrozenAfter: 0,
|
||||||
|
IdempotencyKey: uuid.NewString(),
|
||||||
|
Remark: "内容销售收入 (扣除平台费)",
|
||||||
|
OperatorUserID: o.UserID,
|
||||||
|
}
|
||||||
|
if err := tx.TenantLedger.WithContext(ctx).Create(ledger); err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
return nil
|
||||||
})
|
})
|
||||||
if err != nil {
|
|
||||||
return err
|
|
||||||
}
|
|
||||||
|
|
||||||
// 3. Grant Content Access
|
|
||||||
items, _ := tx.OrderItem.WithContext(ctx).Where(tx.OrderItem.OrderID.Eq(o.ID)).Find()
|
|
||||||
for _, item := range items {
|
|
||||||
// Check if access already exists (idempotency)
|
|
||||||
exists, _ := tx.ContentAccess.WithContext(ctx).
|
|
||||||
Where(tx.ContentAccess.UserID.Eq(o.UserID), tx.ContentAccess.ContentID.Eq(item.ContentID)).
|
|
||||||
Exists()
|
|
||||||
if exists {
|
|
||||||
continue
|
|
||||||
}
|
|
||||||
|
|
||||||
access := &models.ContentAccess{
|
|
||||||
TenantID: item.TenantID,
|
|
||||||
UserID: o.UserID,
|
|
||||||
ContentID: item.ContentID,
|
|
||||||
OrderID: o.ID,
|
|
||||||
Status: consts.ContentAccessStatusActive,
|
|
||||||
}
|
|
||||||
if err := tx.ContentAccess.WithContext(ctx).Save(access); err != nil {
|
|
||||||
return err
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
// 4. Create Tenant Ledger (Revenue) - Only for Content Purchase
|
|
||||||
if o.Type == consts.OrderTypeContentPurchase {
|
|
||||||
t, err := tx.Tenant.WithContext(ctx).Where(tx.Tenant.ID.Eq(o.TenantID)).First()
|
|
||||||
if err != nil {
|
|
||||||
return err
|
|
||||||
}
|
|
||||||
tenantOwnerID = t.UserID
|
|
||||||
|
|
||||||
// Calculate Commission
|
|
||||||
amount := o.AmountPaid
|
|
||||||
fee := int64(float64(amount) * 0.10)
|
|
||||||
creatorIncome := amount - fee
|
|
||||||
|
|
||||||
// Credit Tenant Owner Balance (Net Income)
|
|
||||||
_, err = tx.User.WithContext(ctx).
|
|
||||||
Where(tx.User.ID.Eq(tenantOwnerID)).
|
|
||||||
Update(tx.User.Balance, gorm.Expr("balance + ?", creatorIncome))
|
|
||||||
if err != nil {
|
|
||||||
return err
|
|
||||||
}
|
|
||||||
|
|
||||||
ledger := &models.TenantLedger{
|
|
||||||
TenantID: o.TenantID,
|
|
||||||
UserID: t.UserID, // Owner
|
|
||||||
OrderID: o.ID,
|
|
||||||
Type: consts.TenantLedgerTypeDebitPurchase, // Income from purchase
|
|
||||||
Amount: creatorIncome,
|
|
||||||
BalanceBefore: 0, // TODO
|
|
||||||
BalanceAfter: 0, // TODO
|
|
||||||
FrozenBefore: 0,
|
|
||||||
FrozenAfter: 0,
|
|
||||||
IdempotencyKey: uuid.NewString(),
|
|
||||||
Remark: "内容销售收入 (扣除平台费)",
|
|
||||||
OperatorUserID: o.UserID,
|
|
||||||
}
|
|
||||||
if err := tx.TenantLedger.WithContext(ctx).Create(ledger); err != nil {
|
|
||||||
return err
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
return nil
|
|
||||||
})
|
})
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return err
|
||||||
|
|||||||
Reference in New Issue
Block a user