Files
quyun-v2/backend/app/services/ledger.go

482 lines
17 KiB
Go
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
package services
import (
"context"
"errors"
"time"
"quyun/v2/app/errorx"
"quyun/v2/app/http/tenant/dto"
"quyun/v2/app/requests"
"quyun/v2/database/models"
"quyun/v2/pkg/consts"
"github.com/samber/lo"
"github.com/sirupsen/logrus"
"go.ipao.vip/gen"
"gorm.io/gorm"
"gorm.io/gorm/clause"
)
// LedgerApplyResult 表示一次账本写入(含幂等命中)的结果,包含账本记录与用户余额快照。
type LedgerApplyResult struct {
// Ledger 为本次创建的账本记录(若幂等命中则返回已有记录)。
Ledger *models.TenantLedger
// User 为写入后余额状态(若幂等命中则返回当前快照)。
User *models.User
}
// ledger 提供租户账本能力(冻结/解冻/扣减/退款等),支持幂等与行锁保证一致性。
// 注意:余额为 users 表的全局余额,用户可在已加入租户间共享消费。
//
// @provider
type ledger struct {
db *gorm.DB
}
// MyBalance 查询当前用户的全局余额信息(可用/冻结)。
// 语义:必须先是该租户成员(否则返回 not found但余额数据来源为 users。
func (s *ledger) MyBalance(ctx context.Context, tenantID, userID int64) (*models.User, error) {
if tenantID <= 0 || userID <= 0 {
return nil, errorx.ErrInvalidParameter.WithMsg("tenant_id/user_id must be > 0")
}
logrus.WithFields(logrus.Fields{
"tenant_id": tenantID,
"user_id": userID,
}).Info("services.ledger.me.balance")
// 必须先是租户成员。
tblTU, queryTU := models.TenantUserQuery.QueryContext(ctx)
if _, err := queryTU.Where(tblTU.TenantID.Eq(tenantID), tblTU.UserID.Eq(userID)).First(); err != nil {
if errors.Is(err, gorm.ErrRecordNotFound) {
return nil, errorx.ErrRecordNotFound.WithMsg("tenant user not found")
}
return nil, err
}
tblU, queryU := models.UserQuery.QueryContext(ctx)
m, err := queryU.Where(tblU.ID.Eq(userID), tblU.DeletedAt.IsNull()).First()
if err != nil {
if errors.Is(err, gorm.ErrRecordNotFound) {
return nil, errorx.ErrRecordNotFound.WithMsg("user not found")
}
return nil, err
}
return m, nil
}
// MyLedgerPage 分页查询当前用户在指定租户下的余额流水(用于“我的流水”)。
func (s *ledger) MyLedgerPage(ctx context.Context, tenantID, userID int64, filter *dto.MyLedgerListFilter) (*requests.Pager, error) {
if tenantID <= 0 || userID <= 0 {
return nil, errorx.ErrInvalidParameter.WithMsg("tenant_id/user_id must be > 0")
}
if filter == nil {
filter = &dto.MyLedgerListFilter{}
}
logrus.WithFields(logrus.Fields{
"tenant_id": tenantID,
"user_id": userID,
"type": lo.FromPtr(filter.Type),
"order_id": lo.FromPtr(filter.OrderID),
}).Info("services.ledger.me.ledgers.page")
filter.Pagination.Format()
tbl, query := models.TenantLedgerQuery.QueryContext(ctx)
conds := []gen.Condition{
tbl.TenantID.Eq(tenantID),
tbl.UserID.Eq(userID),
}
if filter.Type != nil {
conds = append(conds, tbl.Type.Eq(*filter.Type))
}
if filter.OrderID != nil && *filter.OrderID > 0 {
conds = append(conds, tbl.OrderID.Eq(*filter.OrderID))
}
if filter.CreatedAtFrom != nil {
conds = append(conds, tbl.CreatedAt.Gte(*filter.CreatedAtFrom))
}
if filter.CreatedAtTo != nil {
conds = append(conds, tbl.CreatedAt.Lte(*filter.CreatedAtTo))
}
ledgers, total, err := query.Where(conds...).Order(tbl.ID.Desc()).FindByPage(int(filter.Offset()), int(filter.Limit))
if err != nil {
return nil, err
}
items := lo.Map(ledgers, func(m *models.TenantLedger, _ int) *dto.MyLedgerItem {
return &dto.MyLedgerItem{
Ledger: m,
TypeDescription: m.Type.Description(),
}
})
return &requests.Pager{
Pagination: filter.Pagination,
Total: total,
Items: items,
}, nil
}
// AdminLedgerPage 分页查询租户内余额流水(租户后台审计用)。
func (s *ledger) AdminLedgerPage(ctx context.Context, tenantID int64, filter *dto.AdminLedgerListFilter) (*requests.Pager, error) {
if tenantID <= 0 {
return nil, errorx.ErrInvalidParameter.WithMsg("tenant_id must be > 0")
}
if filter == nil {
filter = &dto.AdminLedgerListFilter{}
}
logrus.WithFields(logrus.Fields{
"tenant_id": tenantID,
"operator_user_id": lo.FromPtr(filter.OperatorUserID),
"user_id": lo.FromPtr(filter.UserID),
"type": lo.FromPtr(filter.Type),
"order_id": lo.FromPtr(filter.OrderID),
"biz_ref_type": lo.FromPtr(filter.BizRefType),
"biz_ref_id": lo.FromPtr(filter.BizRefID),
"created_at_from": filter.CreatedAtFrom,
"created_at_to": filter.CreatedAtTo,
"pagination_page": filter.Page,
"pagination_limit": filter.Limit,
"pagination_offset": filter.Offset(),
}).Info("services.ledger.admin.page")
filter.Pagination.Format()
tbl, query := models.TenantLedgerQuery.QueryContext(ctx)
conds := []gen.Condition{tbl.TenantID.Eq(tenantID)}
if filter.OperatorUserID != nil && *filter.OperatorUserID > 0 {
conds = append(conds, tbl.OperatorUserID.Eq(*filter.OperatorUserID))
}
if filter.UserID != nil && *filter.UserID > 0 {
conds = append(conds, tbl.UserID.Eq(*filter.UserID))
}
if filter.Type != nil {
conds = append(conds, tbl.Type.Eq(*filter.Type))
}
if filter.OrderID != nil && *filter.OrderID > 0 {
conds = append(conds, tbl.OrderID.Eq(*filter.OrderID))
}
if filter.BizRefType != nil && *filter.BizRefType != "" {
conds = append(conds, tbl.BizRefType.Eq(*filter.BizRefType))
}
if filter.BizRefID != nil && *filter.BizRefID > 0 {
conds = append(conds, tbl.BizRefID.Eq(*filter.BizRefID))
}
if filter.CreatedAtFrom != nil {
conds = append(conds, tbl.CreatedAt.Gte(*filter.CreatedAtFrom))
}
if filter.CreatedAtTo != nil {
conds = append(conds, tbl.CreatedAt.Lte(*filter.CreatedAtTo))
}
ledgers, total, err := query.Where(conds...).Order(tbl.ID.Desc()).FindByPage(int(filter.Offset()), int(filter.Limit))
if err != nil {
return nil, err
}
items := lo.Map(ledgers, func(m *models.TenantLedger, _ int) *dto.AdminLedgerItem {
return &dto.AdminLedgerItem{
Ledger: m,
TypeDescription: m.Type.Description(),
}
})
return &requests.Pager{
Pagination: filter.Pagination,
Total: total,
Items: items,
}, nil
}
// Freeze 将可用余额转入冻结余额,并写入账本记录。
func (s *ledger) Freeze(ctx context.Context, tenantID, userID, orderID, amount int64, idempotencyKey, remark string, now time.Time) (*LedgerApplyResult, error) {
// 冻结通常由用户自己发起(下单冻结);操作者默认等于余额账户归属 user_id。
bizRefType := ""
if orderID > 0 {
bizRefType = "order"
}
return s.apply(ctx, s.db, tenantID, userID, userID, orderID, bizRefType, orderID, consts.TenantLedgerTypeFreeze, amount, -amount, amount, idempotencyKey, remark, now)
}
// Unfreeze 将冻结余额转回可用余额,并写入账本记录。
func (s *ledger) Unfreeze(ctx context.Context, tenantID, userID, orderID, amount int64, idempotencyKey, remark string, now time.Time) (*LedgerApplyResult, error) {
// 解冻通常由用户自己发起(失败回滚/退款回滚等可能由系统或管理员触发;此处默认等于 user_id
bizRefType := ""
if orderID > 0 {
bizRefType = "order"
}
return s.apply(ctx, s.db, tenantID, userID, userID, orderID, bizRefType, orderID, consts.TenantLedgerTypeUnfreeze, amount, amount, -amount, idempotencyKey, remark, now)
}
// FreezeTx 为 Freeze 的事务版本(由外层事务控制提交/回滚)。
func (s *ledger) FreezeTx(ctx context.Context, tx *gorm.DB, tenantID, operatorUserID, userID, orderID, amount int64, idempotencyKey, remark string, now time.Time) (*LedgerApplyResult, error) {
bizRefType := ""
if orderID > 0 {
bizRefType = "order"
}
return s.apply(ctx, tx, tenantID, operatorUserID, userID, orderID, bizRefType, orderID, consts.TenantLedgerTypeFreeze, amount, -amount, amount, idempotencyKey, remark, now)
}
// UnfreezeTx 为 Unfreeze 的事务版本(由外层事务控制提交/回滚)。
func (s *ledger) UnfreezeTx(ctx context.Context, tx *gorm.DB, tenantID, operatorUserID, userID, orderID, amount int64, idempotencyKey, remark string, now time.Time) (*LedgerApplyResult, error) {
bizRefType := ""
if orderID > 0 {
bizRefType = "order"
}
return s.apply(ctx, tx, tenantID, operatorUserID, userID, orderID, bizRefType, orderID, consts.TenantLedgerTypeUnfreeze, amount, amount, -amount, idempotencyKey, remark, now)
}
// DebitPurchaseTx 将冻结资金转为实际扣款(减少冻结余额),并写入账本记录。
func (s *ledger) DebitPurchaseTx(ctx context.Context, tx *gorm.DB, tenantID, operatorUserID, userID, orderID, amount int64, idempotencyKey, remark string, now time.Time) (*LedgerApplyResult, error) {
return s.apply(ctx, tx, tenantID, operatorUserID, userID, orderID, "order", orderID, consts.TenantLedgerTypeDebitPurchase, amount, 0, -amount, idempotencyKey, remark, now)
}
// CreditRefundTx 将退款金额退回到可用余额,并写入账本记录。
func (s *ledger) CreditRefundTx(ctx context.Context, tx *gorm.DB, tenantID, operatorUserID, userID, orderID, amount int64, idempotencyKey, remark string, now time.Time) (*LedgerApplyResult, error) {
return s.apply(ctx, tx, tenantID, operatorUserID, userID, orderID, "order", orderID, consts.TenantLedgerTypeCreditRefund, amount, amount, 0, idempotencyKey, remark, now)
}
func (s *ledger) apply(
ctx context.Context,
tx *gorm.DB,
tenantID, operatorUserID, userID, orderID int64,
bizRefType string, bizRefID int64,
ledgerType consts.TenantLedgerType,
amount, deltaBalance, deltaFrozen int64,
idempotencyKey, remark string,
now time.Time,
) (*LedgerApplyResult, error) {
// 关键前置校验:金额必须为正;时间允许由调用方注入,便于测试与一致性落库。
if amount <= 0 {
return nil, errorx.ErrInvalidParameter.WithMsg("amount must be > 0")
}
if now.IsZero() {
now = time.Now()
}
logrus.WithFields(logrus.Fields{
"tenant_id": tenantID,
"operator_user_id": operatorUserID,
"user_id": userID,
"order_id": orderID,
"biz_ref_type": bizRefType,
"biz_ref_id": bizRefID,
"type": ledgerType,
"amount": amount,
"idempotency_key": idempotencyKey,
"delta_balance": deltaBalance,
"delta_frozen": deltaFrozen,
"remark_non_empty": remark != "",
}).Info("services.ledger.apply")
var out LedgerApplyResult
err := tx.WithContext(ctx).Transaction(func(tx *gorm.DB) error {
// 必须先是租户成员(账本维度仍按 tenant_id 记录)。
var tu models.TenantUser
if err := tx.
Where("tenant_id = ? AND user_id = ?", tenantID, userID).
First(&tu).Error; err != nil {
if errors.Is(err, gorm.ErrRecordNotFound) {
return errorx.ErrRecordNotFound.WithMsg("tenant user not found")
}
return err
}
// 幂等快速路径:在进入行锁之前先查一次,减少锁竞争(命中则直接返回)。
if idempotencyKey != "" {
var existing models.TenantLedger
if err := tx.
Where("tenant_id = ? AND user_id = ? AND idempotency_key = ?", tenantID, userID, idempotencyKey).
First(&existing).Error; err == nil {
var current models.User
if err := tx.Where("id = ? AND deleted_at IS NULL", userID).First(&current).Error; err != nil {
if errors.Is(err, gorm.ErrRecordNotFound) {
return errorx.ErrRecordNotFound.WithMsg("user not found")
}
return err
}
out.Ledger = &existing
out.User = &current
return nil
} else if !errors.Is(err, gorm.ErrRecordNotFound) {
return err
}
}
// 结构化幂等快速路径:当调用方未传 idempotency_key但提供了 biz_ref 时,按 (tenant,biz_ref,type) 去重。
if idempotencyKey == "" && bizRefType != "" && bizRefID > 0 {
var existing models.TenantLedger
if err := tx.
Where("tenant_id = ? AND biz_ref_type = ? AND biz_ref_id = ? AND type = ?", tenantID, bizRefType, bizRefID, ledgerType).
First(&existing).Error; err == nil {
var current models.User
if err := tx.Where("id = ? AND deleted_at IS NULL", userID).First(&current).Error; err != nil {
if errors.Is(err, gorm.ErrRecordNotFound) {
return errorx.ErrRecordNotFound.WithMsg("user not found")
}
return err
}
out.Ledger = &existing
out.User = &current
return nil
} else if !errors.Is(err, gorm.ErrRecordNotFound) {
return err
}
}
// 使用行锁锁住 users确保同一用户在“跨租户消费”场景下余额更新的串行一致性。
var u models.User
if err := tx.
Clauses(clause.Locking{Strength: "UPDATE"}).
Where("id = ? AND deleted_at IS NULL", userID).
First(&u).Error; err != nil {
if errors.Is(err, gorm.ErrRecordNotFound) {
return errorx.ErrRecordNotFound.WithMsg("user not found")
}
return err
}
// 二次幂等校验:防止并发下在获取锁前后插入账本导致的重复写入。
if idempotencyKey != "" {
var existing models.TenantLedger
if err := tx.
Where("tenant_id = ? AND user_id = ? AND idempotency_key = ?", tenantID, userID, idempotencyKey).
First(&existing).Error; err == nil {
out.Ledger = &existing
out.User = &u
return nil
} else if !errors.Is(err, gorm.ErrRecordNotFound) {
return err
}
}
// 二次结构化幂等校验:与上面的幂等逻辑一致,避免并发下重复写入。
if idempotencyKey == "" && bizRefType != "" && bizRefID > 0 {
var existing models.TenantLedger
if err := tx.
Where("tenant_id = ? AND biz_ref_type = ? AND biz_ref_id = ? AND type = ?", tenantID, bizRefType, bizRefID, ledgerType).
First(&existing).Error; err == nil {
out.Ledger = &existing
out.User = &u
return nil
} else if !errors.Is(err, gorm.ErrRecordNotFound) {
return err
}
}
balanceBefore := u.Balance
frozenBefore := u.BalanceFrozen
balanceAfter := balanceBefore + deltaBalance
frozenAfter := frozenBefore + deltaFrozen
// 关键不变量:余额/冻结余额不能为负,避免透支或超额解冻。
if balanceAfter < 0 {
return errorx.ErrPreconditionFailed.WithMsg("余额不足")
}
if frozenAfter < 0 {
return errorx.ErrPreconditionFailed.WithMsg("冻结余额不足")
}
// 先更新余额,再写账本:任何一步失败都回滚,保证“余额变更”和“账本记录”一致。
if err := tx.Model(&models.User{}).
Where("id = ?", u.ID).
Updates(map[string]any{
"balance": balanceAfter,
"balance_frozen": frozenAfter,
"updated_at": now,
}).Error; err != nil {
return err
}
// 写入账本:记录变更前后快照,便于对账与审计;幂等键用于去重。
ledger := &models.TenantLedger{
TenantID: tenantID,
OperatorUserID: operatorUserID,
UserID: userID,
OrderID: orderID,
BizRefType: bizRefType,
BizRefID: bizRefID,
Type: ledgerType,
Amount: amount,
BalanceBefore: balanceBefore,
BalanceAfter: balanceAfter,
FrozenBefore: frozenBefore,
FrozenAfter: frozenAfter,
IdempotencyKey: idempotencyKey,
Remark: remark,
CreatedAt: now,
UpdatedAt: now,
}
if err := tx.Create(ledger).Error; err != nil {
// 并发下可能出现“先写成功后再重试”的情况:尝试按幂等键回读,保持接口幂等。
if idempotencyKey != "" {
var existing models.TenantLedger
if e2 := tx.
Where("tenant_id = ? AND user_id = ? AND idempotency_key = ?", tenantID, userID, idempotencyKey).
First(&existing).Error; e2 == nil {
out.Ledger = &existing
out.User = &u
return nil
}
}
// 结构化幂等回读:当未传 idempotency_key 时按 biz_ref 回读。
if idempotencyKey == "" && bizRefType != "" && bizRefID > 0 {
var existing models.TenantLedger
if e2 := tx.
Where("tenant_id = ? AND biz_ref_type = ? AND biz_ref_id = ? AND type = ?", tenantID, bizRefType, bizRefID, ledgerType).
First(&existing).Error; e2 == nil {
out.Ledger = &existing
out.User = &u
return nil
}
}
return err
}
u.Balance = balanceAfter
u.BalanceFrozen = frozenAfter
u.UpdatedAt = now
out.Ledger = ledger
out.User = &u
return nil
})
if err != nil {
logrus.WithFields(logrus.Fields{
"tenant_id": tenantID,
"operator_user_id": operatorUserID,
"user_id": userID,
"order_id": orderID,
"biz_ref_type": bizRefType,
"biz_ref_id": bizRefID,
"type": ledgerType,
"idempotency_key": idempotencyKey,
}).WithError(err).Warn("services.ledger.apply.failed")
return nil, err
}
logrus.WithFields(logrus.Fields{
"tenant_id": tenantID,
"operator_user_id": operatorUserID,
"user_id": userID,
"order_id": orderID,
"biz_ref_type": bizRefType,
"biz_ref_id": bizRefID,
"type": ledgerType,
"ledger_id": out.Ledger.ID,
"idempotency_key": idempotencyKey,
"balance_after": out.User.Balance,
"frozen_after": out.User.BalanceFrozen,
}).Info("services.ledger.apply.ok")
return &out, nil
}