package services import ( "context" "errors" "time" "quyun/v2/app/errorx" "quyun/v2/app/http/tenant/dto" "quyun/v2/app/requests" "quyun/v2/database/models" "quyun/v2/pkg/consts" "github.com/samber/lo" "github.com/sirupsen/logrus" "go.ipao.vip/gen" "gorm.io/gorm" "gorm.io/gorm/clause" ) // LedgerApplyResult 表示一次账本写入(含幂等命中)的结果,包含账本记录与租户用户余额快照。 type LedgerApplyResult struct { // Ledger 为本次创建的账本记录(若幂等命中则返回已有记录)。 Ledger *models.TenantLedger // TenantUser 为写入后余额状态(若幂等命中则返回当前快照)。 TenantUser *models.TenantUser } // ledger 提供租户余额账本能力(冻结/解冻/扣减/退款/充值),支持幂等与行锁保证一致性。 // // @provider type ledger struct { db *gorm.DB } // MyBalance 查询当前用户在指定租户下的余额信息(可用/冻结)。 func (s *ledger) MyBalance(ctx context.Context, tenantID, userID int64) (*models.TenantUser, error) { if tenantID <= 0 || userID <= 0 { return nil, errorx.ErrInvalidParameter.WithMsg("tenant_id/user_id must be > 0") } logrus.WithFields(logrus.Fields{ "tenant_id": tenantID, "user_id": userID, }).Info("services.ledger.me.balance") tbl, query := models.TenantUserQuery.QueryContext(ctx) m, err := query.Where(tbl.TenantID.Eq(tenantID), tbl.UserID.Eq(userID)).First() if err != nil { if errors.Is(err, gorm.ErrRecordNotFound) { return nil, errorx.ErrRecordNotFound.WithMsg("tenant user not found") } return nil, err } return m, nil } // MyLedgerPage 分页查询当前用户在指定租户下的余额流水(用于“我的流水”)。 func (s *ledger) MyLedgerPage(ctx context.Context, tenantID, userID int64, filter *dto.MyLedgerListFilter) (*requests.Pager, error) { if tenantID <= 0 || userID <= 0 { return nil, errorx.ErrInvalidParameter.WithMsg("tenant_id/user_id must be > 0") } if filter == nil { filter = &dto.MyLedgerListFilter{} } logrus.WithFields(logrus.Fields{ "tenant_id": tenantID, "user_id": userID, "type": lo.FromPtr(filter.Type), "order_id": lo.FromPtr(filter.OrderID), }).Info("services.ledger.me.ledgers.page") filter.Pagination.Format() tbl, query := models.TenantLedgerQuery.QueryContext(ctx) conds := []gen.Condition{ tbl.TenantID.Eq(tenantID), tbl.UserID.Eq(userID), } if filter.Type != nil { conds = append(conds, tbl.Type.Eq(*filter.Type)) } if filter.OrderID != nil && *filter.OrderID > 0 { conds = append(conds, tbl.OrderID.Eq(*filter.OrderID)) } if filter.CreatedAtFrom != nil { conds = append(conds, tbl.CreatedAt.Gte(*filter.CreatedAtFrom)) } if filter.CreatedAtTo != nil { conds = append(conds, tbl.CreatedAt.Lte(*filter.CreatedAtTo)) } ledgers, total, err := query.Where(conds...).Order(tbl.ID.Desc()).FindByPage(int(filter.Offset()), int(filter.Limit)) if err != nil { return nil, err } items := lo.Map(ledgers, func(m *models.TenantLedger, _ int) *dto.MyLedgerItem { return &dto.MyLedgerItem{ Ledger: m, TypeDescription: m.Type.Description(), } }) return &requests.Pager{ Pagination: filter.Pagination, Total: total, Items: items, }, nil } // AdminLedgerPage 分页查询租户内余额流水(租户后台审计用)。 func (s *ledger) AdminLedgerPage(ctx context.Context, tenantID int64, filter *dto.AdminLedgerListFilter) (*requests.Pager, error) { if tenantID <= 0 { return nil, errorx.ErrInvalidParameter.WithMsg("tenant_id must be > 0") } if filter == nil { filter = &dto.AdminLedgerListFilter{} } logrus.WithFields(logrus.Fields{ "tenant_id": tenantID, "operator_user_id": lo.FromPtr(filter.OperatorUserID), "user_id": lo.FromPtr(filter.UserID), "type": lo.FromPtr(filter.Type), "order_id": lo.FromPtr(filter.OrderID), "biz_ref_type": lo.FromPtr(filter.BizRefType), "biz_ref_id": lo.FromPtr(filter.BizRefID), "created_at_from": filter.CreatedAtFrom, "created_at_to": filter.CreatedAtTo, "pagination_page": filter.Page, "pagination_limit": filter.Limit, "pagination_offset": filter.Offset(), }).Info("services.ledger.admin.page") filter.Pagination.Format() tbl, query := models.TenantLedgerQuery.QueryContext(ctx) conds := []gen.Condition{tbl.TenantID.Eq(tenantID)} if filter.OperatorUserID != nil && *filter.OperatorUserID > 0 { conds = append(conds, tbl.OperatorUserID.Eq(*filter.OperatorUserID)) } if filter.UserID != nil && *filter.UserID > 0 { conds = append(conds, tbl.UserID.Eq(*filter.UserID)) } if filter.Type != nil { conds = append(conds, tbl.Type.Eq(*filter.Type)) } if filter.OrderID != nil && *filter.OrderID > 0 { conds = append(conds, tbl.OrderID.Eq(*filter.OrderID)) } if filter.BizRefType != nil && *filter.BizRefType != "" { conds = append(conds, tbl.BizRefType.Eq(*filter.BizRefType)) } if filter.BizRefID != nil && *filter.BizRefID > 0 { conds = append(conds, tbl.BizRefID.Eq(*filter.BizRefID)) } if filter.CreatedAtFrom != nil { conds = append(conds, tbl.CreatedAt.Gte(*filter.CreatedAtFrom)) } if filter.CreatedAtTo != nil { conds = append(conds, tbl.CreatedAt.Lte(*filter.CreatedAtTo)) } ledgers, total, err := query.Where(conds...).Order(tbl.ID.Desc()).FindByPage(int(filter.Offset()), int(filter.Limit)) if err != nil { return nil, err } items := lo.Map(ledgers, func(m *models.TenantLedger, _ int) *dto.AdminLedgerItem { return &dto.AdminLedgerItem{ Ledger: m, TypeDescription: m.Type.Description(), } }) return &requests.Pager{ Pagination: filter.Pagination, Total: total, Items: items, }, nil } // Freeze 将可用余额转入冻结余额,并写入账本记录。 func (s *ledger) Freeze(ctx context.Context, tenantID, userID, orderID, amount int64, idempotencyKey, remark string, now time.Time) (*LedgerApplyResult, error) { // 冻结通常由用户自己发起(下单冻结);操作者默认等于余额账户归属 user_id。 bizRefType := "" if orderID > 0 { bizRefType = "order" } return s.apply(ctx, s.db, tenantID, userID, userID, orderID, bizRefType, orderID, consts.TenantLedgerTypeFreeze, amount, -amount, amount, idempotencyKey, remark, now) } // Unfreeze 将冻结余额转回可用余额,并写入账本记录。 func (s *ledger) Unfreeze(ctx context.Context, tenantID, userID, orderID, amount int64, idempotencyKey, remark string, now time.Time) (*LedgerApplyResult, error) { // 解冻通常由用户自己发起(失败回滚/退款回滚等可能由系统或管理员触发;此处默认等于 user_id)。 bizRefType := "" if orderID > 0 { bizRefType = "order" } return s.apply(ctx, s.db, tenantID, userID, userID, orderID, bizRefType, orderID, consts.TenantLedgerTypeUnfreeze, amount, amount, -amount, idempotencyKey, remark, now) } // FreezeTx 为 Freeze 的事务版本(由外层事务控制提交/回滚)。 func (s *ledger) FreezeTx(ctx context.Context, tx *gorm.DB, tenantID, operatorUserID, userID, orderID, amount int64, idempotencyKey, remark string, now time.Time) (*LedgerApplyResult, error) { bizRefType := "" if orderID > 0 { bizRefType = "order" } return s.apply(ctx, tx, tenantID, operatorUserID, userID, orderID, bizRefType, orderID, consts.TenantLedgerTypeFreeze, amount, -amount, amount, idempotencyKey, remark, now) } // UnfreezeTx 为 Unfreeze 的事务版本(由外层事务控制提交/回滚)。 func (s *ledger) UnfreezeTx(ctx context.Context, tx *gorm.DB, tenantID, operatorUserID, userID, orderID, amount int64, idempotencyKey, remark string, now time.Time) (*LedgerApplyResult, error) { bizRefType := "" if orderID > 0 { bizRefType = "order" } return s.apply(ctx, tx, tenantID, operatorUserID, userID, orderID, bizRefType, orderID, consts.TenantLedgerTypeUnfreeze, amount, amount, -amount, idempotencyKey, remark, now) } // DebitPurchaseTx 将冻结资金转为实际扣款(减少冻结余额),并写入账本记录。 func (s *ledger) DebitPurchaseTx(ctx context.Context, tx *gorm.DB, tenantID, operatorUserID, userID, orderID, amount int64, idempotencyKey, remark string, now time.Time) (*LedgerApplyResult, error) { return s.apply(ctx, tx, tenantID, operatorUserID, userID, orderID, "order", orderID, consts.TenantLedgerTypeDebitPurchase, amount, 0, -amount, idempotencyKey, remark, now) } // CreditRefundTx 将退款金额退回到可用余额,并写入账本记录。 func (s *ledger) CreditRefundTx(ctx context.Context, tx *gorm.DB, tenantID, operatorUserID, userID, orderID, amount int64, idempotencyKey, remark string, now time.Time) (*LedgerApplyResult, error) { return s.apply(ctx, tx, tenantID, operatorUserID, userID, orderID, "order", orderID, consts.TenantLedgerTypeCreditRefund, amount, amount, 0, idempotencyKey, remark, now) } // CreditTopupTx 将充值金额记入可用余额,并写入账本记录。 func (s *ledger) CreditTopupTx(ctx context.Context, tx *gorm.DB, tenantID, operatorUserID, userID, orderID, amount int64, idempotencyKey, remark string, now time.Time) (*LedgerApplyResult, error) { return s.apply(ctx, tx, tenantID, operatorUserID, userID, orderID, "order", orderID, consts.TenantLedgerTypeCreditTopup, amount, amount, 0, idempotencyKey, remark, now) } func (s *ledger) apply( ctx context.Context, tx *gorm.DB, tenantID, operatorUserID, userID, orderID int64, bizRefType string, bizRefID int64, ledgerType consts.TenantLedgerType, amount, deltaBalance, deltaFrozen int64, idempotencyKey, remark string, now time.Time, ) (*LedgerApplyResult, error) { // 关键前置校验:金额必须为正;时间允许由调用方注入,便于测试与一致性落库。 if amount <= 0 { return nil, errorx.ErrInvalidParameter.WithMsg("amount must be > 0") } if now.IsZero() { now = time.Now() } logrus.WithFields(logrus.Fields{ "tenant_id": tenantID, "operator_user_id": operatorUserID, "user_id": userID, "order_id": orderID, "biz_ref_type": bizRefType, "biz_ref_id": bizRefID, "type": ledgerType, "amount": amount, "idempotency_key": idempotencyKey, "delta_balance": deltaBalance, "delta_frozen": deltaFrozen, "remark_non_empty": remark != "", }).Info("services.ledger.apply") var out LedgerApplyResult err := tx.WithContext(ctx).Transaction(func(tx *gorm.DB) error { // 幂等快速路径:在进入行锁之前先查一次,减少锁竞争(命中则直接返回)。 if idempotencyKey != "" { var existing models.TenantLedger if err := tx. Where("tenant_id = ? AND user_id = ? AND idempotency_key = ?", tenantID, userID, idempotencyKey). First(&existing).Error; err == nil { var current models.TenantUser if err := tx.Where("tenant_id = ? AND user_id = ?", tenantID, userID).First(¤t).Error; err != nil { return err } out.Ledger = &existing out.TenantUser = ¤t return nil } else if !errors.Is(err, gorm.ErrRecordNotFound) { return err } } // 结构化幂等快速路径:当调用方未传 idempotency_key,但提供了 biz_ref 时,按 (tenant,biz_ref,type) 去重。 if idempotencyKey == "" && bizRefType != "" && bizRefID > 0 { var existing models.TenantLedger if err := tx. Where("tenant_id = ? AND biz_ref_type = ? AND biz_ref_id = ? AND type = ?", tenantID, bizRefType, bizRefID, ledgerType). First(&existing).Error; err == nil { var current models.TenantUser if err := tx.Where("tenant_id = ? AND user_id = ?", tenantID, userID).First(¤t).Error; err != nil { return err } out.Ledger = &existing out.TenantUser = ¤t return nil } else if !errors.Is(err, gorm.ErrRecordNotFound) { return err } } // 使用行锁锁住 tenant_users,确保同一租户下同一用户余额更新的串行一致性。 var tu models.TenantUser if err := tx. Clauses(clause.Locking{Strength: "UPDATE"}). Where("tenant_id = ? AND user_id = ?", tenantID, userID). First(&tu).Error; err != nil { if errors.Is(err, gorm.ErrRecordNotFound) { return errorx.ErrRecordNotFound.WithMsg("tenant user not found") } return err } // 二次幂等校验:防止并发下在获取锁前后插入账本导致的重复写入。 if idempotencyKey != "" { var existing models.TenantLedger if err := tx. Where("tenant_id = ? AND user_id = ? AND idempotency_key = ?", tenantID, userID, idempotencyKey). First(&existing).Error; err == nil { out.Ledger = &existing out.TenantUser = &tu return nil } else if !errors.Is(err, gorm.ErrRecordNotFound) { return err } } // 二次结构化幂等校验:与上面的幂等逻辑一致,避免并发下重复写入。 if idempotencyKey == "" && bizRefType != "" && bizRefID > 0 { var existing models.TenantLedger if err := tx. Where("tenant_id = ? AND biz_ref_type = ? AND biz_ref_id = ? AND type = ?", tenantID, bizRefType, bizRefID, ledgerType). First(&existing).Error; err == nil { out.Ledger = &existing out.TenantUser = &tu return nil } else if !errors.Is(err, gorm.ErrRecordNotFound) { return err } } balanceBefore := tu.Balance frozenBefore := tu.BalanceFrozen balanceAfter := balanceBefore + deltaBalance frozenAfter := frozenBefore + deltaFrozen // 关键不变量:余额/冻结余额不能为负,避免透支或超额解冻。 if balanceAfter < 0 { return errorx.ErrPreconditionFailed.WithMsg("余额不足") } if frozenAfter < 0 { return errorx.ErrPreconditionFailed.WithMsg("冻结余额不足") } // 先更新余额,再写账本:任何一步失败都回滚,保证“余额变更”和“账本记录”一致。 if err := tx.Model(&models.TenantUser{}). Where("id = ?", tu.ID). Updates(map[string]any{ "balance": balanceAfter, "balance_frozen": frozenAfter, "updated_at": now, }).Error; err != nil { return err } // 写入账本:记录变更前后快照,便于对账与审计;幂等键用于去重。 ledger := &models.TenantLedger{ TenantID: tenantID, OperatorUserID: operatorUserID, UserID: userID, OrderID: orderID, BizRefType: bizRefType, BizRefID: bizRefID, Type: ledgerType, Amount: amount, BalanceBefore: balanceBefore, BalanceAfter: balanceAfter, FrozenBefore: frozenBefore, FrozenAfter: frozenAfter, IdempotencyKey: idempotencyKey, Remark: remark, CreatedAt: now, UpdatedAt: now, } if err := tx.Create(ledger).Error; err != nil { // 并发下可能出现“先写成功后再重试”的情况:尝试按幂等键回读,保持接口幂等。 if idempotencyKey != "" { var existing models.TenantLedger if e2 := tx. Where("tenant_id = ? AND user_id = ? AND idempotency_key = ?", tenantID, userID, idempotencyKey). First(&existing).Error; e2 == nil { out.Ledger = &existing out.TenantUser = &tu return nil } } // 结构化幂等回读:当未传 idempotency_key 时按 biz_ref 回读。 if idempotencyKey == "" && bizRefType != "" && bizRefID > 0 { var existing models.TenantLedger if e2 := tx. Where("tenant_id = ? AND biz_ref_type = ? AND biz_ref_id = ? AND type = ?", tenantID, bizRefType, bizRefID, ledgerType). First(&existing).Error; e2 == nil { out.Ledger = &existing out.TenantUser = &tu return nil } } return err } tu.Balance = balanceAfter tu.BalanceFrozen = frozenAfter tu.UpdatedAt = now out.Ledger = ledger out.TenantUser = &tu return nil }) if err != nil { logrus.WithFields(logrus.Fields{ "tenant_id": tenantID, "operator_user_id": operatorUserID, "user_id": userID, "order_id": orderID, "biz_ref_type": bizRefType, "biz_ref_id": bizRefID, "type": ledgerType, "idempotency_key": idempotencyKey, }).WithError(err).Warn("services.ledger.apply.failed") return nil, err } logrus.WithFields(logrus.Fields{ "tenant_id": tenantID, "operator_user_id": operatorUserID, "user_id": userID, "order_id": orderID, "biz_ref_type": bizRefType, "biz_ref_id": bizRefID, "type": ledgerType, "ledger_id": out.Ledger.ID, "idempotency_key": idempotencyKey, "balance_after": out.TenantUser.Balance, "frozen_after": out.TenantUser.BalanceFrozen, }).Info("services.ledger.apply.ok") return &out, nil }