feat: 移除“租户管理员为用户充值 / 每租户一套余额”能力:余额统一为全局用户余额

This commit is contained in:
2025-12-23 10:59:59 +08:00
parent dd7bcdfb98
commit a80c9b759b
39 changed files with 566 additions and 1869 deletions

View File

@@ -1,35 +0,0 @@
package migrate
import (
"context"
"database/sql"
"github.com/pkg/errors"
"github.com/pressly/goose/v3"
"github.com/riverqueue/river/riverdriver/riverdatabasesql"
"github.com/riverqueue/river/rivermigrate"
)
func init() {
goose.AddMigrationNoTxContext(RiverQueueUp, RiverQueueDown)
}
func RiverQueueUp(ctx context.Context, db *sql.DB) error {
migrator, err := rivermigrate.New(riverdatabasesql.New(db), nil)
if err != nil {
return errors.Wrap(err, "river migrate up failed")
}
_, err = migrator.Migrate(ctx, rivermigrate.DirectionUp, &rivermigrate.MigrateOpts{TargetVersion: -1})
return err
}
func RiverQueueDown(ctx context.Context, db *sql.DB) error {
migrator, err := rivermigrate.New(riverdatabasesql.New(db), nil)
if err != nil {
return errors.Wrap(err, "river migrate down failed")
}
_, err = migrator.Migrate(ctx, rivermigrate.DirectionDown, &rivermigrate.MigrateOpts{TargetVersion: -1})
return err
}

View File

@@ -57,33 +57,27 @@ func Serve(cmd *cobra.Command, args []string) error {
goose.SetBaseFS(database.MigrationFS)
goose.SetTableName("migrations")
goose.AddNamedMigrationNoTxContext("0001_river_job.go", RiverUp, RiverDown)
goose.AddNamedMigrationNoTxContext(
"10000000000001_river_job.go",
func(ctx context.Context, db *sql.DB) error {
migrator, err := rivermigrate.New(riverdatabasesql.New(db), nil)
if err != nil {
return err
}
_, err = migrator.Migrate(ctx, rivermigrate.DirectionUp, &rivermigrate.MigrateOpts{TargetVersion: -1})
return err
},
func(ctx context.Context, db *sql.DB) error {
migrator, err := rivermigrate.New(riverdatabasesql.New(db), nil)
if err != nil {
return err
}
_, err = migrator.Migrate(ctx, rivermigrate.DirectionDown, &rivermigrate.MigrateOpts{TargetVersion: -1})
return err
})
return goose.RunContext(context.Background(), action, svc.DB, "migrations", args...)
})
}
func RiverUp(ctx context.Context, db *sql.DB) error {
migrator, err := rivermigrate.New(riverdatabasesql.New(db), nil)
if err != nil {
return err
}
// Migrate up. An empty MigrateOpts will migrate all the way up, but
// best practice is to specify a specific target version.
_, err = migrator.Migrate(ctx, rivermigrate.DirectionUp, &rivermigrate.MigrateOpts{})
return err
}
func RiverDown(ctx context.Context, db *sql.DB) error {
migrator, err := rivermigrate.New(riverdatabasesql.New(db), nil)
if err != nil {
return err
}
// TargetVersion -1 removes River's schema completely.
_, err = migrator.Migrate(ctx, rivermigrate.DirectionDown, &rivermigrate.MigrateOpts{
TargetVersion: -1,
})
return err
}

View File

@@ -18,7 +18,7 @@ type AdminLedgerListFilter struct {
requests.Pagination `json:",inline" query:",inline"`
// OperatorUserID 按操作者用户ID过滤可选
// 典型场景:后台检索“某个管理员发起的充值/退款”等敏感操作流水。
// 典型场景:后台检索“某个管理员发起的退款/调账”等敏感操作流水。
OperatorUserID *int64 `json:"operator_user_id,omitempty" query:"operator_user_id"`
// UserID 按余额账户归属用户ID过滤可选
@@ -32,7 +32,7 @@ type AdminLedgerListFilter struct {
OrderID *int64 `json:"order_id,omitempty" query:"order_id"`
// BizRefType 按业务引用类型过滤(可选)。
// 约定:当前业务写入为 "order";未来可扩展为 refund/topup 等。
// 约定:当前业务写入为 "order";未来可扩展为 refund 等。
BizRefType *string `json:"biz_ref_type,omitempty" query:"biz_ref_type"`
// BizRefID 按业务引用ID过滤可选

View File

@@ -29,7 +29,7 @@ type AdminOrderListFilter struct {
// ContentTitle 内容标题关键字(可选):通过 order_items + contents 关联,模糊匹配 contents.titlelike
ContentTitle *string `json:"content_title,omitempty" query:"content_title"`
// Type 订单类型可选content_purchase/topup 等。
// Type 订单类型可选content_purchase 等。
Type *consts.OrderType `json:"type,omitempty" query:"type"`
// Status 订单状态可选created/paid/refunding/refunded/canceled/failed。

View File

@@ -1,65 +0,0 @@
package dto
// AdminBatchTopupItem 批量充值的单条明细。
type AdminBatchTopupItem struct {
// UserID 目标用户ID必须属于当前租户否则该条充值失败。
UserID int64 `json:"user_id"`
// Amount 充值金额:单位分;必须 > 0。
Amount int64 `json:"amount"`
// Reason 充值原因(可选):用于审计与追溯。
Reason string `json:"reason,omitempty"`
// IdempotencyKey 幂等键(可选):为空时后端会用 batch_idempotency_key 派生生成;
// 建议前端/调用方提供稳定值,便于重试时保持结果一致。
IdempotencyKey string `json:"idempotency_key,omitempty"`
}
// AdminBatchTopupForm 租户管理员批量充值请求参数。
type AdminBatchTopupForm struct {
// BatchIdempotencyKey 批次幂等键:必须填写;用于重试同一批次时保证不会重复入账。
BatchIdempotencyKey string `json:"batch_idempotency_key"`
// Items 充值明细列表:至少 1 条;单批次条数在业务侧限制(避免拖垮系统)。
Items []*AdminBatchTopupItem `json:"items"`
}
// AdminBatchTopupResultItem 批量充值的单条处理结果。
type AdminBatchTopupResultItem struct {
// UserID 目标用户ID。
UserID int64 `json:"user_id"`
// Amount 充值金额(单位分)。
Amount int64 `json:"amount"`
// IdempotencyKey 实际使用的幂等键:可能为客户端传入,也可能为后端派生生成。
IdempotencyKey string `json:"idempotency_key"`
// OrderID 生成的订单ID成功时返回失败时为 0。
OrderID int64 `json:"order_id,omitempty"`
// OK 是否成功true 表示该条充值已成功入账或命中幂等成功结果。
OK bool `json:"ok"`
// ErrorCode 错误码:失败时返回;成功时为 0。
ErrorCode int `json:"error_code,omitempty"`
// ErrorMessage 错误信息:失败时返回;成功时为空。
ErrorMessage string `json:"error_message,omitempty"`
}
// AdminBatchTopupResponse 批量充值的汇总结果。
type AdminBatchTopupResponse struct {
// Total 总条数:等于 items 长度。
Total int `json:"total"`
// Success 成功条数。
Success int `json:"success"`
// Failed 失败条数。
Failed int `json:"failed"`
// Items 明细结果列表:与请求 items 顺序一致,便于前端逐条展示。
Items []*AdminBatchTopupResultItem `json:"items"`
}

View File

@@ -1,11 +0,0 @@
package dto
// AdminTopupForm defines payload for tenant-admin to topup a tenant member balance.
type AdminTopupForm struct {
// Amount is the topup amount in cents (CNY 分); must be > 0.
Amount int64 `json:"amount,omitempty"`
// Reason is the human-readable topup reason used for audit.
Reason string `json:"reason,omitempty"`
// IdempotencyKey ensures the topup request is processed at most once.
IdempotencyKey string `json:"idempotency_key,omitempty"`
}

View File

@@ -135,25 +135,25 @@ func (*orderAdmin) adminOrderDetail(
return &dto.AdminOrderDetail{Order: m}, nil
}
// adminRefund
//
// @Summary 订单退款(租户管理)
// @Description 该接口只负责将订单从 paid 推进到 refunding并提交异步退款任务退款入账与权益回收由 worker 异步完成。
// @Description 重复请求幂等:订单处于 refunding/refunded 时会返回当前订单状态,不会重复入账/重复回收权益。
// @Tags Tenant
// @Accept json
// @Produce json
// @Param tenantCode path string true "Tenant Code"
// @Param orderID path int64 true "OrderID"
// @Param form body dto.AdminOrderRefundForm true "Form"
// @Success 200 {object} models.Order
//
// @Router /t/:tenantCode/v1/admin/orders/:orderID/refund [post]
// @Bind tenant local key(tenant)
// @Bind tenantUser local key(tenant_user)
// @Bind orderID path
// @Bind form body
func (*orderAdmin) adminRefund(
// adminRefund
//
// @Summary 订单退款(租户管理)
// @Description 该接口只负责将订单从 paid 推进到 refunding并提交异步退款任务退款入账与权益回收由 worker 异步完成。
// @Description 重复请求幂等:订单处于 refunding/refunded 时会返回当前订单状态,不会重复入账/重复回收权益。
// @Tags Tenant
// @Accept json
// @Produce json
// @Param tenantCode path string true "Tenant Code"
// @Param orderID path int64 true "OrderID"
// @Param form body dto.AdminOrderRefundForm true "Form"
// @Success 200 {object} models.Order
//
// @Router /t/:tenantCode/v1/admin/orders/:orderID/refund [post]
// @Bind tenant local key(tenant)
// @Bind tenantUser local key(tenant_user)
// @Bind orderID path
// @Bind form body
func (*orderAdmin) adminRefund(
ctx fiber.Ctx,
tenant *models.Tenant,
tenantUser *models.TenantUser,
@@ -187,88 +187,5 @@ func (*orderAdmin) adminOrderDetail(
)
}
// adminTopupUser
//
// @Summary 为租户成员充值(租户管理)
// @Tags Tenant
// @Accept json
// @Produce json
// @Param tenantCode path string true "Tenant Code"
// @Param userID path int64 true "UserID"
// @Param form body dto.AdminTopupForm true "Form"
// @Success 200 {object} models.Order
//
// @Router /t/:tenantCode/v1/admin/users/:userID/topup [post]
// @Bind tenant local key(tenant)
// @Bind tenantUser local key(tenant_user)
// @Bind userID path
// @Bind form body
func (*orderAdmin) adminTopupUser(
ctx fiber.Ctx,
tenant *models.Tenant,
tenantUser *models.TenantUser,
userID int64,
form *dto.AdminTopupForm,
) (*models.Order, error) {
if err := requireTenantAdmin(tenantUser); err != nil {
return nil, err
}
if form == nil {
return nil, errorx.ErrInvalidParameter
}
log.WithFields(log.Fields{
"tenant_id": tenant.ID,
"operator_user": tenantUser.UserID,
"target_user": userID,
"amount": form.Amount,
"idempotency_key": form.IdempotencyKey,
}).Info("tenant.admin.users.topup")
return services.Order.AdminTopupUser(
ctx,
tenant.ID,
tenantUser.UserID,
userID,
form.Amount,
form.IdempotencyKey,
form.Reason,
time.Now(),
)
}
// adminBatchTopupUsers
//
// @Summary 批量为租户成员充值(租户管理)
// @Tags Tenant
// @Accept json
// @Produce json
// @Param tenantCode path string true "Tenant Code"
// @Param form body dto.AdminBatchTopupForm true "Form"
// @Success 200 {object} dto.AdminBatchTopupResponse
//
// @Router /t/:tenantCode/v1/admin/users/topup/batch [post]
// @Bind tenant local key(tenant)
// @Bind tenantUser local key(tenant_user)
// @Bind form body
func (*orderAdmin) adminBatchTopupUsers(
ctx fiber.Ctx,
tenant *models.Tenant,
tenantUser *models.TenantUser,
form *dto.AdminBatchTopupForm,
) (*dto.AdminBatchTopupResponse, error) {
if err := requireTenantAdmin(tenantUser); err != nil {
return nil, err
}
if form == nil {
return nil, errorx.ErrInvalidParameter
}
log.WithFields(log.Fields{
"tenant_id": tenant.ID,
"user_id": tenantUser.UserID,
"total": len(form.Items),
}).Info("tenant.admin.users.topup.batch")
return services.Order.AdminBatchTopupUsers(ctx, tenant.ID, tenantUser.UserID, form, time.Now())
}
// 注意:已移除“租户管理员为用户充值”能力。
// 余额已改为 users 表的全局余额,用户可在已加入租户间共享消费;按租户充值会导致账务复杂且易出错。

View File

@@ -218,21 +218,6 @@ func (r *Routes) Register(router fiber.Router) {
PathParam[int64]("orderID"),
Body[dto.AdminOrderRefundForm]("form"),
))
r.log.Debugf("Registering route: Post /t/:tenantCode/v1/admin/users/:userID/topup -> orderAdmin.adminTopupUser")
router.Post("/t/:tenantCode/v1/admin/users/:userID/topup"[len(r.Path()):], DataFunc4(
r.orderAdmin.adminTopupUser,
Local[*models.Tenant]("tenant"),
Local[*models.TenantUser]("tenant_user"),
PathParam[int64]("userID"),
Body[dto.AdminTopupForm]("form"),
))
r.log.Debugf("Registering route: Post /t/:tenantCode/v1/admin/users/topup/batch -> orderAdmin.adminBatchTopupUsers")
router.Post("/t/:tenantCode/v1/admin/users/topup/batch"[len(r.Path()):], DataFunc3(
r.orderAdmin.adminBatchTopupUsers,
Local[*models.Tenant]("tenant"),
Local[*models.TenantUser]("tenant_user"),
Body[dto.AdminBatchTopupForm]("form"),
))
// Register routes for controller: orderMe
r.log.Debugf("Registering route: Get /t/:tenantCode/v1/orders -> orderMe.myOrders")
router.Get("/t/:tenantCode/v1/orders"[len(r.Path()):], DataFunc3(

View File

@@ -18,23 +18,25 @@ import (
"gorm.io/gorm/clause"
)
// LedgerApplyResult 表示一次账本写入(含幂等命中)的结果,包含账本记录与租户用户余额快照。
// LedgerApplyResult 表示一次账本写入(含幂等命中)的结果,包含账本记录与用户余额快照。
type LedgerApplyResult struct {
// Ledger 为本次创建的账本记录(若幂等命中则返回已有记录)。
Ledger *models.TenantLedger
// TenantUser 为写入后余额状态(若幂等命中则返回当前快照)。
TenantUser *models.TenantUser
// User 为写入后余额状态(若幂等命中则返回当前快照)。
User *models.User
}
// ledger 提供租户余额账本能力(冻结/解冻/扣减/退款/充值),支持幂等与行锁保证一致性。
// ledger 提供租户账本能力(冻结/解冻/扣减/退款),支持幂等与行锁保证一致性。
// 注意:余额为 users 表的全局余额,用户可在已加入租户间共享消费。
//
// @provider
type ledger struct {
db *gorm.DB
}
// MyBalance 查询当前用户在指定租户下的余额信息(可用/冻结)。
func (s *ledger) MyBalance(ctx context.Context, tenantID, userID int64) (*models.TenantUser, error) {
// MyBalance 查询当前用户的全局余额信息(可用/冻结)。
// 语义:必须先是该租户成员(否则返回 not found但余额数据来源为 users。
func (s *ledger) MyBalance(ctx context.Context, tenantID, userID int64) (*models.User, error) {
if tenantID <= 0 || userID <= 0 {
return nil, errorx.ErrInvalidParameter.WithMsg("tenant_id/user_id must be > 0")
}
@@ -44,14 +46,23 @@ func (s *ledger) MyBalance(ctx context.Context, tenantID, userID int64) (*models
"user_id": userID,
}).Info("services.ledger.me.balance")
tbl, query := models.TenantUserQuery.QueryContext(ctx)
m, err := query.Where(tbl.TenantID.Eq(tenantID), tbl.UserID.Eq(userID)).First()
if err != nil {
// 必须先是租户成员。
tblTU, queryTU := models.TenantUserQuery.QueryContext(ctx)
if _, err := queryTU.Where(tblTU.TenantID.Eq(tenantID), tblTU.UserID.Eq(userID)).First(); err != nil {
if errors.Is(err, gorm.ErrRecordNotFound) {
return nil, errorx.ErrRecordNotFound.WithMsg("tenant user not found")
}
return nil, err
}
tblU, queryU := models.UserQuery.QueryContext(ctx)
m, err := queryU.Where(tblU.ID.Eq(userID), tblU.DeletedAt.IsNull()).First()
if err != nil {
if errors.Is(err, gorm.ErrRecordNotFound) {
return nil, errorx.ErrRecordNotFound.WithMsg("user not found")
}
return nil, err
}
return m, nil
}
@@ -232,11 +243,6 @@ func (s *ledger) CreditRefundTx(ctx context.Context, tx *gorm.DB, tenantID, oper
return s.apply(ctx, tx, tenantID, operatorUserID, userID, orderID, "order", orderID, consts.TenantLedgerTypeCreditRefund, amount, amount, 0, idempotencyKey, remark, now)
}
// CreditTopupTx 将充值金额记入可用余额,并写入账本记录。
func (s *ledger) CreditTopupTx(ctx context.Context, tx *gorm.DB, tenantID, operatorUserID, userID, orderID, amount int64, idempotencyKey, remark string, now time.Time) (*LedgerApplyResult, error) {
return s.apply(ctx, tx, tenantID, operatorUserID, userID, orderID, "order", orderID, consts.TenantLedgerTypeCreditTopup, amount, amount, 0, idempotencyKey, remark, now)
}
func (s *ledger) apply(
ctx context.Context,
tx *gorm.DB,
@@ -273,18 +279,32 @@ func (s *ledger) apply(
var out LedgerApplyResult
err := tx.WithContext(ctx).Transaction(func(tx *gorm.DB) error {
// 必须先是租户成员(账本维度仍按 tenant_id 记录)。
var tu models.TenantUser
if err := tx.
Where("tenant_id = ? AND user_id = ?", tenantID, userID).
First(&tu).Error; err != nil {
if errors.Is(err, gorm.ErrRecordNotFound) {
return errorx.ErrRecordNotFound.WithMsg("tenant user not found")
}
return err
}
// 幂等快速路径:在进入行锁之前先查一次,减少锁竞争(命中则直接返回)。
if idempotencyKey != "" {
var existing models.TenantLedger
if err := tx.
Where("tenant_id = ? AND user_id = ? AND idempotency_key = ?", tenantID, userID, idempotencyKey).
First(&existing).Error; err == nil {
var current models.TenantUser
if err := tx.Where("tenant_id = ? AND user_id = ?", tenantID, userID).First(&current).Error; err != nil {
var current models.User
if err := tx.Where("id = ? AND deleted_at IS NULL", userID).First(&current).Error; err != nil {
if errors.Is(err, gorm.ErrRecordNotFound) {
return errorx.ErrRecordNotFound.WithMsg("user not found")
}
return err
}
out.Ledger = &existing
out.TenantUser = &current
out.User = &current
return nil
} else if !errors.Is(err, gorm.ErrRecordNotFound) {
return err
@@ -297,26 +317,29 @@ func (s *ledger) apply(
if err := tx.
Where("tenant_id = ? AND biz_ref_type = ? AND biz_ref_id = ? AND type = ?", tenantID, bizRefType, bizRefID, ledgerType).
First(&existing).Error; err == nil {
var current models.TenantUser
if err := tx.Where("tenant_id = ? AND user_id = ?", tenantID, userID).First(&current).Error; err != nil {
var current models.User
if err := tx.Where("id = ? AND deleted_at IS NULL", userID).First(&current).Error; err != nil {
if errors.Is(err, gorm.ErrRecordNotFound) {
return errorx.ErrRecordNotFound.WithMsg("user not found")
}
return err
}
out.Ledger = &existing
out.TenantUser = &current
out.User = &current
return nil
} else if !errors.Is(err, gorm.ErrRecordNotFound) {
return err
}
}
// 使用行锁锁住 tenant_users确保同一租户下同一用户余额更新的串行一致性。
var tu models.TenantUser
// 使用行锁锁住 users确保同一用户在“跨租户消费”场景下余额更新的串行一致性。
var u models.User
if err := tx.
Clauses(clause.Locking{Strength: "UPDATE"}).
Where("tenant_id = ? AND user_id = ?", tenantID, userID).
First(&tu).Error; err != nil {
Where("id = ? AND deleted_at IS NULL", userID).
First(&u).Error; err != nil {
if errors.Is(err, gorm.ErrRecordNotFound) {
return errorx.ErrRecordNotFound.WithMsg("tenant user not found")
return errorx.ErrRecordNotFound.WithMsg("user not found")
}
return err
}
@@ -328,7 +351,7 @@ func (s *ledger) apply(
Where("tenant_id = ? AND user_id = ? AND idempotency_key = ?", tenantID, userID, idempotencyKey).
First(&existing).Error; err == nil {
out.Ledger = &existing
out.TenantUser = &tu
out.User = &u
return nil
} else if !errors.Is(err, gorm.ErrRecordNotFound) {
return err
@@ -342,15 +365,15 @@ func (s *ledger) apply(
Where("tenant_id = ? AND biz_ref_type = ? AND biz_ref_id = ? AND type = ?", tenantID, bizRefType, bizRefID, ledgerType).
First(&existing).Error; err == nil {
out.Ledger = &existing
out.TenantUser = &tu
out.User = &u
return nil
} else if !errors.Is(err, gorm.ErrRecordNotFound) {
return err
}
}
balanceBefore := tu.Balance
frozenBefore := tu.BalanceFrozen
balanceBefore := u.Balance
frozenBefore := u.BalanceFrozen
balanceAfter := balanceBefore + deltaBalance
frozenAfter := frozenBefore + deltaFrozen
@@ -363,8 +386,8 @@ func (s *ledger) apply(
}
// 先更新余额,再写账本:任何一步失败都回滚,保证“余额变更”和“账本记录”一致。
if err := tx.Model(&models.TenantUser{}).
Where("id = ?", tu.ID).
if err := tx.Model(&models.User{}).
Where("id = ?", u.ID).
Updates(map[string]any{
"balance": balanceAfter,
"balance_frozen": frozenAfter,
@@ -400,7 +423,7 @@ func (s *ledger) apply(
Where("tenant_id = ? AND user_id = ? AND idempotency_key = ?", tenantID, userID, idempotencyKey).
First(&existing).Error; e2 == nil {
out.Ledger = &existing
out.TenantUser = &tu
out.User = &u
return nil
}
}
@@ -411,19 +434,19 @@ func (s *ledger) apply(
Where("tenant_id = ? AND biz_ref_type = ? AND biz_ref_id = ? AND type = ?", tenantID, bizRefType, bizRefID, ledgerType).
First(&existing).Error; e2 == nil {
out.Ledger = &existing
out.TenantUser = &tu
out.User = &u
return nil
}
}
return err
}
tu.Balance = balanceAfter
tu.BalanceFrozen = frozenAfter
tu.UpdatedAt = now
u.Balance = balanceAfter
u.BalanceFrozen = frozenAfter
u.UpdatedAt = now
out.Ledger = ledger
out.TenantUser = &tu
out.User = &u
return nil
})
if err != nil {
@@ -450,8 +473,8 @@ func (s *ledger) apply(
"type": ledgerType,
"ledger_id": out.Ledger.ID,
"idempotency_key": idempotencyKey,
"balance_after": out.TenantUser.Balance,
"frozen_after": out.TenantUser.BalanceFrozen,
"balance_after": out.User.Balance,
"frozen_after": out.User.BalanceFrozen,
}).Info("services.ledger.apply.ok")
return &out, nil

View File

@@ -4,6 +4,7 @@ import (
"context"
"database/sql"
"errors"
"fmt"
"testing"
"time"
@@ -46,15 +47,24 @@ func Test_Ledger(t *testing.T) {
}
func (s *LedgerTestSuite) seedTenantUser(ctx context.Context, tenantID, userID, balance, frozen int64) {
database.Truncate(ctx, s.DB, models.TableNameTenantLedger, models.TableNameTenantUser)
database.Truncate(ctx, s.DB, models.TableNameTenantLedger, models.TableNameTenantUser, models.TableNameUser)
now := time.Now().UTC()
_, err := s.DB.ExecContext(ctx, `
INSERT INTO users (id, username, password, roles, status, metas, created_at, updated_at, balance, balance_frozen)
VALUES ($1, $2, 'x', ARRAY['user'], $3, '{}'::jsonb, $4, $4, $5, $6)
ON CONFLICT (id) DO UPDATE
SET balance = EXCLUDED.balance, balance_frozen = EXCLUDED.balance_frozen, updated_at = EXCLUDED.updated_at
`, userID, fmt.Sprintf("u%d", userID), consts.UserStatusVerified, now, balance, frozen)
So(err, ShouldBeNil)
tu := &models.TenantUser{
TenantID: tenantID,
UserID: userID,
Role: types.NewArray([]consts.TenantUserRole{consts.TenantUserRoleMember}),
Balance: balance,
BalanceFrozen: frozen,
Status: consts.UserStatusVerified,
TenantID: tenantID,
UserID: userID,
Role: types.NewArray([]consts.TenantUserRole{consts.TenantUserRoleMember}),
Status: consts.UserStatusVerified,
CreatedAt: now,
UpdatedAt: now,
}
So(tu.Create(ctx), ShouldBeNil)
}
@@ -82,7 +92,7 @@ func (s *LedgerTestSuite) Test_Freeze() {
So(err, ShouldBeNil)
So(res, ShouldNotBeNil)
So(res.Ledger, ShouldNotBeNil)
So(res.TenantUser, ShouldNotBeNil)
So(res.User, ShouldNotBeNil)
So(res.Ledger.Type, ShouldEqual, consts.TenantLedgerTypeFreeze)
So(res.Ledger.Amount, ShouldEqual, 300)
So(res.Ledger.BalanceBefore, ShouldEqual, 1000)
@@ -92,8 +102,8 @@ func (s *LedgerTestSuite) Test_Freeze() {
So(res.Ledger.OperatorUserID, ShouldEqual, userID)
So(res.Ledger.BizRefType, ShouldEqual, "")
So(res.Ledger.BizRefID, ShouldEqual, int64(0))
So(res.TenantUser.Balance, ShouldEqual, 700)
So(res.TenantUser.BalanceFrozen, ShouldEqual, 300)
So(res.User.Balance, ShouldEqual, 700)
So(res.User.BalanceFrozen, ShouldEqual, 300)
})
Convey("幂等键重复调用不应重复扣减", func() {
@@ -106,10 +116,10 @@ func (s *LedgerTestSuite) Test_Freeze() {
So(res2.Ledger, ShouldNotBeNil)
So(res2.Ledger.IdempotencyKey, ShouldEqual, "k_freeze_idem")
var tu2 models.TenantUser
So(_db.WithContext(ctx).Where("tenant_id = ? AND user_id = ?", tenantID, userID).First(&tu2).Error, ShouldBeNil)
So(tu2.Balance, ShouldEqual, 700)
So(tu2.BalanceFrozen, ShouldEqual, 300)
var u2 models.User
So(_db.WithContext(ctx).Where("id = ?", userID).First(&u2).Error, ShouldBeNil)
So(u2.Balance, ShouldEqual, 700)
So(u2.BalanceFrozen, ShouldEqual, 300)
})
Convey("余额不足应返回前置条件失败", func() {
@@ -157,8 +167,8 @@ func (s *LedgerTestSuite) Test_Unfreeze() {
So(res.Ledger.OperatorUserID, ShouldEqual, userID)
So(res.Ledger.BizRefType, ShouldEqual, "")
So(res.Ledger.BizRefID, ShouldEqual, int64(0))
So(res.TenantUser.Balance, ShouldEqual, 1000)
So(res.TenantUser.BalanceFrozen, ShouldEqual, 0)
So(res.User.Balance, ShouldEqual, 1000)
So(res.User.BalanceFrozen, ShouldEqual, 0)
})
Convey("幂等键重复调用不应重复入账", func() {
@@ -172,10 +182,10 @@ func (s *LedgerTestSuite) Test_Unfreeze() {
So(err, ShouldBeNil)
So(res2, ShouldNotBeNil)
var tu2 models.TenantUser
So(_db.WithContext(ctx).Where("tenant_id = ? AND user_id = ?", tenantID, userID).First(&tu2).Error, ShouldBeNil)
So(tu2.Balance, ShouldEqual, 1000)
So(tu2.BalanceFrozen, ShouldEqual, 0)
var u2 models.User
So(_db.WithContext(ctx).Where("id = ?", userID).First(&u2).Error, ShouldBeNil)
So(u2.Balance, ShouldEqual, 1000)
So(u2.BalanceFrozen, ShouldEqual, 0)
})
})
}
@@ -210,8 +220,8 @@ func (s *LedgerTestSuite) Test_DebitPurchaseTx() {
So(res.Ledger.OperatorUserID, ShouldEqual, userID)
So(res.Ledger.BizRefType, ShouldEqual, "order")
So(res.Ledger.BizRefID, ShouldEqual, int64(123))
So(res.TenantUser.Balance, ShouldEqual, 700)
So(res.TenantUser.BalanceFrozen, ShouldEqual, 0)
So(res.User.Balance, ShouldEqual, 700)
So(res.User.BalanceFrozen, ShouldEqual, 0)
})
Convey("幂等键重复调用不应重复扣减冻结余额", func() {
@@ -224,10 +234,10 @@ func (s *LedgerTestSuite) Test_DebitPurchaseTx() {
_, err = Ledger.DebitPurchaseTx(ctx, _db, tenantID, userID, userID, 123, 300, "k_debit_idem", "debit", now.Add(time.Second))
So(err, ShouldBeNil)
var tu2 models.TenantUser
So(_db.WithContext(ctx).Where("tenant_id = ? AND user_id = ?", tenantID, userID).First(&tu2).Error, ShouldBeNil)
So(tu2.Balance, ShouldEqual, 700)
So(tu2.BalanceFrozen, ShouldEqual, 0)
var u2 models.User
So(_db.WithContext(ctx).Where("id = ?", userID).First(&u2).Error, ShouldBeNil)
So(u2.Balance, ShouldEqual, 700)
So(u2.BalanceFrozen, ShouldEqual, 0)
})
})
}
@@ -259,8 +269,8 @@ func (s *LedgerTestSuite) Test_CreditRefundTx() {
So(res.Ledger.OperatorUserID, ShouldEqual, userID)
So(res.Ledger.BizRefType, ShouldEqual, "order")
So(res.Ledger.BizRefID, ShouldEqual, int64(123))
So(res.TenantUser.Balance, ShouldEqual, 1000)
So(res.TenantUser.BalanceFrozen, ShouldEqual, 0)
So(res.User.Balance, ShouldEqual, 1000)
So(res.User.BalanceFrozen, ShouldEqual, 0)
})
Convey("幂等键重复调用不应重复退款入账", func() {
@@ -274,48 +284,9 @@ func (s *LedgerTestSuite) Test_CreditRefundTx() {
_, err = Ledger.CreditRefundTx(ctx, _db, tenantID, userID, userID, 123, 300, "k_refund_idem", "refund", now.Add(time.Second))
So(err, ShouldBeNil)
var tu2 models.TenantUser
So(_db.WithContext(ctx).Where("tenant_id = ? AND user_id = ?", tenantID, userID).First(&tu2).Error, ShouldBeNil)
So(tu2.Balance, ShouldEqual, 1000)
})
})
}
func (s *LedgerTestSuite) Test_CreditTopupTx() {
Convey("Ledger.CreditTopupTx", s.T(), func() {
ctx := s.T().Context()
tenantID := int64(1)
userID := int64(2)
now := time.Now().UTC()
s.seedTenantUser(ctx, tenantID, userID, 1000, 0)
Convey("金额非法应返回参数错误", func() {
_, err := Ledger.CreditTopupTx(ctx, _db, tenantID, 999, userID, 456, 0, "k_topup_invalid_amount", "topup", now)
So(err, ShouldNotBeNil)
})
Convey("成功充值应增加可用余额并写入账本", func() {
res, err := Ledger.CreditTopupTx(ctx, _db, tenantID, 999, userID, 456, 200, "k_topup_1", "topup", now)
So(err, ShouldBeNil)
So(res, ShouldNotBeNil)
So(res.Ledger.Type, ShouldEqual, consts.TenantLedgerTypeCreditTopup)
So(res.Ledger.OperatorUserID, ShouldEqual, int64(999))
So(res.Ledger.BizRefType, ShouldEqual, "order")
So(res.Ledger.BizRefID, ShouldEqual, int64(456))
So(res.TenantUser.Balance, ShouldEqual, 1200)
So(res.TenantUser.BalanceFrozen, ShouldEqual, 0)
})
Convey("幂等键重复调用不应重复充值入账", func() {
_, err := Ledger.CreditTopupTx(ctx, _db, tenantID, 999, userID, 456, 200, "k_topup_idem", "topup", now)
So(err, ShouldBeNil)
_, err = Ledger.CreditTopupTx(ctx, _db, tenantID, 999, userID, 456, 200, "k_topup_idem", "topup", now.Add(time.Second))
So(err, ShouldBeNil)
var tu2 models.TenantUser
So(_db.WithContext(ctx).Where("tenant_id = ? AND user_id = ?", tenantID, userID).First(&tu2).Error, ShouldBeNil)
So(tu2.Balance, ShouldEqual, 1200)
var u2 models.User
So(_db.WithContext(ctx).Where("id = ?", userID).First(&u2).Error, ShouldBeNil)
So(u2.Balance, ShouldEqual, 1000)
})
})
}
@@ -328,7 +299,7 @@ func (s *LedgerTestSuite) Test_MyBalance() {
s.seedTenantUser(ctx, tenantID, userID, 1000, 200)
Convey("成功返回租户内余额", func() {
Convey("成功返回全局余额", func() {
m, err := Ledger.MyBalance(ctx, tenantID, userID)
So(err, ShouldBeNil)
So(m, ShouldNotBeNil)
@@ -352,9 +323,9 @@ func (s *LedgerTestSuite) Test_MyLedgerPage() {
s.seedTenantUser(ctx, tenantID, userID, 1000, 0)
_, err := Ledger.CreditTopupTx(ctx, _db, tenantID, userID, userID, 1, 200, "k_topup_for_page", "topup", now)
_, err := Ledger.Freeze(ctx, tenantID, userID, 1, 200, "k_freeze_for_page_1", "freeze", now)
So(err, ShouldBeNil)
_, err = Ledger.Freeze(ctx, tenantID, userID, 2, 100, "k_freeze_for_page", "freeze", now.Add(time.Second))
_, err = Ledger.Unfreeze(ctx, tenantID, userID, 1, 100, "k_unfreeze_for_page_1", "unfreeze", now.Add(time.Second))
So(err, ShouldBeNil)
Convey("分页返回流水列表", func() {
@@ -365,7 +336,7 @@ func (s *LedgerTestSuite) Test_MyLedgerPage() {
})
Convey("按 type 过滤", func() {
typ := consts.TenantLedgerTypeCreditTopup
typ := consts.TenantLedgerTypeFreeze
pager, err := Ledger.MyLedgerPage(ctx, tenantID, userID, &dto.MyLedgerListFilter{Type: &typ})
So(err, ShouldBeNil)
So(pager.Total, ShouldEqual, 1)
@@ -382,8 +353,8 @@ func (s *LedgerTestSuite) Test_AdminLedgerPage() {
s.seedTenantUser(ctx, tenantID, userID, 1000, 0)
// 模拟后台管理员为用户充值operator_user_id 与 user_id 不同。
_, err := Ledger.CreditTopupTx(ctx, _db, tenantID, 999, userID, 777, 200, "k_admin_topup_for_page", "topup", now)
// 模拟后台管理员为用户冻结资金operator_user_id 与 user_id 不同。
_, err := Ledger.FreezeTx(ctx, _db, tenantID, 999, userID, 777, 200, "k_admin_freeze_for_page", "freeze", now)
So(err, ShouldBeNil)
Convey("按 operator_user_id 过滤", func() {

View File

@@ -198,131 +198,6 @@ func (s *order) AdminOrderExportCSV(
}, nil
}
// AdminBatchTopupUsers 租户管理员批量为成员充值(逐条幂等,允许部分失败)。
func (s *order) AdminBatchTopupUsers(
ctx context.Context,
tenantID, operatorUserID int64,
form *dto.AdminBatchTopupForm,
now time.Time,
) (*dto.AdminBatchTopupResponse, error) {
if tenantID <= 0 || operatorUserID <= 0 {
return nil, errorx.ErrInvalidParameter.WithMsg("tenant_id/operator_user_id must be > 0")
}
if form == nil {
return nil, errorx.ErrInvalidParameter.WithMsg("form is required")
}
if strings.TrimSpace(form.BatchIdempotencyKey) == "" {
return nil, errorx.ErrInvalidParameter.WithMsg("batch_idempotency_key is required")
}
if len(form.Items) == 0 {
return nil, errorx.ErrInvalidParameter.WithMsg("items is required")
}
if now.IsZero() {
now = time.Now()
}
// 批量充值属于高敏感操作:限制单次条数,避免拖垮系统。
const maxItems = 200
if len(form.Items) > maxItems {
return nil, errorx.ErrInvalidParameter.WithMsg("items too many")
}
logrus.WithFields(logrus.Fields{
"tenant_id": tenantID,
"operator_user_id": operatorUserID,
"batch_idempotency_key": form.BatchIdempotencyKey,
"total": len(form.Items),
}).Info("services.order.admin.batch_topup")
out := &dto.AdminBatchTopupResponse{
Total: len(form.Items),
Items: make([]*dto.AdminBatchTopupResultItem, 0, len(form.Items)),
}
for idx, item := range form.Items {
if item == nil {
out.Failed++
out.Items = append(out.Items, &dto.AdminBatchTopupResultItem{
OK: false,
ErrorCode: int(errorx.CodeInvalidParameter),
ErrorMessage: "item is nil",
})
continue
}
idemKey := strings.TrimSpace(item.IdempotencyKey)
if idemKey == "" {
// 关键语义:为空时用批次幂等键派生,确保批次重试不会重复入账。
idemKey = fmt.Sprintf("batch_topup:%s:%d:%d", strings.TrimSpace(form.BatchIdempotencyKey), item.UserID, idx)
}
resultItem := &dto.AdminBatchTopupResultItem{
UserID: item.UserID,
Amount: item.Amount,
IdempotencyKey: idemKey,
OK: false,
}
// 单条参数校验:失败只影响该条,不影响整批(便于运营侧修正后重试)。
if item.UserID <= 0 {
resultItem.ErrorCode = int(errorx.CodeInvalidParameter)
resultItem.ErrorMessage = "user_id must be > 0"
out.Failed++
out.Items = append(out.Items, resultItem)
continue
}
if item.Amount <= 0 {
resultItem.ErrorCode = int(errorx.CodeInvalidParameter)
resultItem.ErrorMessage = "amount must be > 0"
out.Failed++
out.Items = append(out.Items, resultItem)
continue
}
// 逐条调用单用户充值逻辑:保持“订单 + 账本 + 余额”一致性与幂等语义一致。
orderModel, err := s.AdminTopupUser(
ctx,
tenantID,
operatorUserID,
item.UserID,
item.Amount,
idemKey,
item.Reason,
now,
)
if err != nil {
// 错误收敛为可展示结构:便于前端逐条展示与导出审计。
var appErr *errorx.AppError
if errors.As(err, &appErr) {
resultItem.ErrorCode = int(appErr.Code)
resultItem.ErrorMessage = appErr.Message
} else {
resultItem.ErrorCode = int(errorx.CodeInternalError)
resultItem.ErrorMessage = err.Error()
}
out.Failed++
out.Items = append(out.Items, resultItem)
logrus.WithFields(logrus.Fields{
"tenant_id": tenantID,
"operator_user_id": operatorUserID,
"user_id": item.UserID,
"amount": item.Amount,
"idempotency_key": idemKey,
}).WithError(err).Warn("services.order.admin.batch_topup.item_failed")
continue
}
resultItem.OK = true
if orderModel != nil {
resultItem.OrderID = orderModel.ID
}
out.Success++
out.Items = append(out.Items, resultItem)
}
return out, nil
}
// PurchaseContentParams 定义“租户内使用余额购买内容”的入参。
type PurchaseContentParams struct {
// TenantID 租户 ID多租户隔离范围
@@ -349,7 +224,7 @@ type PurchaseContentResult struct {
AmountPaid int64
}
// order 提供订单域能力(购买、充值、退款、查询等)。
// order 提供订单域能力(购买、退款、查询等)。
//
// @provider
type order struct {
@@ -394,123 +269,6 @@ func (s *order) enqueueOrderRefundJob(args jobs_args.OrderRefundJob) error {
return s.job.Add(args)
}
// AdminTopupUser 租户管理员给租户成员充值(增加该租户下的可用余额)。
func (s *order) AdminTopupUser(
ctx context.Context,
tenantID, operatorUserID, targetUserID, amount int64,
idempotencyKey, reason string,
now time.Time,
) (*models.Order, error) {
if tenantID <= 0 || operatorUserID <= 0 || targetUserID <= 0 {
return nil, errorx.ErrInvalidParameter.WithMsg("tenant_id/operator_user_id/target_user_id must be > 0")
}
if amount <= 0 {
return nil, errorx.ErrInvalidParameter.WithMsg("amount must be > 0")
}
if now.IsZero() {
now = time.Now()
}
logrus.WithFields(logrus.Fields{
"tenant_id": tenantID,
"operator_user": operatorUserID,
"target_user": targetUserID,
"amount": amount,
"idempotency_key": idempotencyKey,
}).Info("services.order.admin.topup_user")
var out models.Order
err := s.db.WithContext(ctx).Transaction(func(tx *gorm.DB) error {
// 关键前置条件:目标用户必须属于该租户(同时加行锁,避免并发余额写入冲突)。
var tu models.TenantUser
if err := tx.
Clauses(clause.Locking{Strength: "UPDATE"}).
Where("tenant_id = ? AND user_id = ?", tenantID, targetUserID).
First(&tu).Error; err != nil {
if errors.Is(err, gorm.ErrRecordNotFound) {
return errorx.ErrPreconditionFailed.WithMsg("目标用户不属于该租户")
}
return err
}
// 充值幂等:按 orders(tenant_id,user_id,idempotency_key) 去重,避免重复入账。
if idempotencyKey != "" {
var existing models.Order
if err := tx.Where(
"tenant_id = ? AND user_id = ? AND idempotency_key = ?",
tenantID, targetUserID, idempotencyKey,
).First(&existing).Error; err == nil {
out = existing
return nil
} else if !errors.Is(err, gorm.ErrRecordNotFound) {
return err
}
}
// 先落订单paid再写入账本credit_topup确保“订单可追溯 + 账本可对账”。
snapshot := newOrderSnapshot(consts.OrderTypeTopup, &fields.OrdersTopupSnapshot{
OperatorUserID: operatorUserID,
TargetUserID: targetUserID,
Amount: amount,
Currency: consts.CurrencyCNY,
Reason: reason,
IdempotencyKey: idempotencyKey,
TopupAt: now,
})
orderModel := models.Order{
TenantID: tenantID,
UserID: targetUserID,
Type: consts.OrderTypeTopup,
Status: consts.OrderStatusPaid,
Currency: consts.CurrencyCNY,
AmountOriginal: amount,
AmountDiscount: 0,
AmountPaid: amount,
Snapshot: snapshot,
IdempotencyKey: idempotencyKey,
PaidAt: now,
CreatedAt: now,
UpdatedAt: now,
}
if err := tx.Create(&orderModel).Error; err != nil {
return err
}
// 账本幂等键固定使用 topup:<orderID>,保证同一订单不会重复入账。
ledgerKey := fmt.Sprintf("topup:%d", orderModel.ID)
remark := reason
if remark == "" {
remark = fmt.Sprintf("topup by tenant_admin:%d", operatorUserID)
}
if _, err := s.ledger.CreditTopupTx(ctx, tx, tenantID, operatorUserID, targetUserID, orderModel.ID, amount, ledgerKey, remark, now); err != nil {
return err
}
out = orderModel
return nil
})
if err != nil {
logrus.WithFields(logrus.Fields{
"tenant_id": tenantID,
"operator_user": operatorUserID,
"target_user": targetUserID,
"amount": amount,
"idempotency_key": idempotencyKey,
}).WithError(err).Warn("services.order.admin.topup_user.failed")
return nil, err
}
logrus.WithFields(logrus.Fields{
"tenant_id": tenantID,
"target_user": targetUserID,
"order_id": out.ID,
"amount": amount,
}).Info("services.order.admin.topup_user.ok")
return &out, nil
}
// MyOrderPage 分页查询当前用户在租户内的订单。
func (s *order) MyOrderPage(
ctx context.Context,

View File

@@ -60,15 +60,22 @@ func (s *OrderTestSuite) truncate(ctx context.Context, tableNames ...string) {
}
func (s *OrderTestSuite) seedTenantUser(ctx context.Context, tenantID, userID, balance, frozen int64) {
tu := &models.TenantUser{
TenantID: tenantID,
UserID: userID,
Role: types.NewArray([]consts.TenantUserRole{consts.TenantUserRoleMember}),
Balance: balance,
BalanceFrozen: frozen,
Status: consts.UserStatusVerified,
}
So(tu.Create(ctx), ShouldBeNil)
now := time.Now().UTC()
_, err := s.DB.ExecContext(ctx, `
INSERT INTO users (id, username, password, roles, status, metas, created_at, updated_at, balance, balance_frozen)
VALUES ($1, $2, 'x', ARRAY['user'], $3, '{}'::jsonb, $4, $4, $5, $6)
ON CONFLICT (id) DO UPDATE
SET balance = EXCLUDED.balance, balance_frozen = EXCLUDED.balance_frozen, updated_at = EXCLUDED.updated_at
`, userID, fmt.Sprintf("u%d", userID), consts.UserStatusVerified, now, balance, frozen)
So(err, ShouldBeNil)
_, err = s.DB.ExecContext(ctx, `
INSERT INTO tenant_users (tenant_id, user_id, role, status, created_at, updated_at)
VALUES ($1, $2, ARRAY['member'], $3, $4, $4)
ON CONFLICT (tenant_id, user_id) DO UPDATE
SET role = EXCLUDED.role, status = EXCLUDED.status, updated_at = EXCLUDED.updated_at
`, tenantID, userID, consts.UserStatusVerified, now)
So(err, ShouldBeNil)
}
func (s *OrderTestSuite) seedPublishedContent(ctx context.Context, tenantID, ownerUserID int64) *models.Content {
@@ -102,98 +109,6 @@ func (s *OrderTestSuite) seedContentPrice(ctx context.Context, tenantID, content
So(p.Create(ctx), ShouldBeNil)
}
func (s *OrderTestSuite) Test_AdminTopupUser() {
Convey("Order.AdminTopupUser", s.T(), func() {
ctx := s.T().Context()
now := time.Now().UTC()
tenantID := int64(1)
operatorUserID := int64(10)
targetUserID := int64(20)
s.truncate(
ctx,
models.TableNameTenantLedger,
models.TableNameOrderItem,
models.TableNameOrder,
models.TableNameTenantUser,
)
Convey("参数非法应返回错误", func() {
_, err := Order.AdminTopupUser(ctx, 0, operatorUserID, targetUserID, 100, "", "", now)
So(err, ShouldNotBeNil)
_, err = Order.AdminTopupUser(ctx, tenantID, 0, targetUserID, 100, "", "", now)
So(err, ShouldNotBeNil)
_, err = Order.AdminTopupUser(ctx, tenantID, operatorUserID, 0, 100, "", "", now)
So(err, ShouldNotBeNil)
_, err = Order.AdminTopupUser(ctx, tenantID, operatorUserID, targetUserID, 0, "", "", now)
So(err, ShouldNotBeNil)
})
Convey("目标用户不属于该租户应返回前置条件失败", func() {
_, err := Order.AdminTopupUser(ctx, tenantID, operatorUserID, targetUserID, 100, "idem_not_member", "", now)
So(err, ShouldNotBeNil)
var appErr *errorx.AppError
So(errors.As(err, &appErr), ShouldBeTrue)
So(appErr.Code, ShouldEqual, errorx.CodePreconditionFailed)
})
Convey("成功充值并写入账本", func() {
s.seedTenantUser(ctx, tenantID, targetUserID, 0, 0)
orderModel, err := Order.AdminTopupUser(ctx, tenantID, operatorUserID, targetUserID, 300, "idem_topup_1", "充值原因", now)
So(err, ShouldBeNil)
So(orderModel, ShouldNotBeNil)
So(orderModel.ID, ShouldBeGreaterThan, 0)
So(orderModel.Type, ShouldEqual, consts.OrderTypeTopup)
So(orderModel.Status, ShouldEqual, consts.OrderStatusPaid)
So(orderModel.AmountPaid, ShouldEqual, 300)
snap := orderModel.Snapshot.Data()
So(snap.Kind, ShouldEqual, string(consts.OrderTypeTopup))
var snapData fields.OrdersTopupSnapshot
So(json.Unmarshal(snap.Data, &snapData), ShouldBeNil)
So(snapData.OperatorUserID, ShouldEqual, operatorUserID)
So(snapData.TargetUserID, ShouldEqual, targetUserID)
So(snapData.Amount, ShouldEqual, int64(300))
var tu models.TenantUser
So(_db.WithContext(ctx).Where("tenant_id = ? AND user_id = ?", tenantID, targetUserID).First(&tu).Error, ShouldBeNil)
So(tu.Balance, ShouldEqual, 300)
var ledgers []*models.TenantLedger
So(_db.WithContext(ctx).
Where("tenant_id = ? AND user_id = ? AND type = ?", tenantID, targetUserID, consts.TenantLedgerTypeCreditTopup).
Order("id ASC").
Find(&ledgers).Error, ShouldBeNil)
So(len(ledgers), ShouldEqual, 1)
So(ledgers[0].OrderID, ShouldEqual, orderModel.ID)
So(ledgers[0].Amount, ShouldEqual, 300)
})
Convey("幂等键重复调用不应重复入账", func() {
s.seedTenantUser(ctx, tenantID, targetUserID, 0, 0)
o1, err := Order.AdminTopupUser(ctx, tenantID, operatorUserID, targetUserID, 300, "idem_topup_2", "充值原因", now)
So(err, ShouldBeNil)
So(o1, ShouldNotBeNil)
o2, err := Order.AdminTopupUser(ctx, tenantID, operatorUserID, targetUserID, 999, "idem_topup_2", "不同金额也不应重复处理", now.Add(time.Second))
So(err, ShouldBeNil)
So(o2, ShouldNotBeNil)
So(o2.ID, ShouldEqual, o1.ID)
var tu models.TenantUser
So(_db.WithContext(ctx).Where("tenant_id = ? AND user_id = ?", tenantID, targetUserID).First(&tu).Error, ShouldBeNil)
So(tu.Balance, ShouldEqual, 300)
})
})
}
func (s *OrderTestSuite) Test_MyOrderPage() {
Convey("Order.MyOrderPage", s.T(), func() {
ctx := s.T().Context()
@@ -654,45 +569,6 @@ func (s *OrderTestSuite) Test_AdminOrderPage() {
So(itemsDesc[0].CreatedAt.After(itemsDesc[1].CreatedAt), ShouldBeTrue)
})
Convey("按 type 过滤", func() {
s.truncate(ctx, models.TableNameOrderItem, models.TableNameOrder)
o1 := &models.Order{
TenantID: tenantID,
UserID: 2,
Type: consts.OrderTypeTopup,
Status: consts.OrderStatusPaid,
Currency: consts.CurrencyCNY,
AmountPaid: 100,
Snapshot: newLegacyOrderSnapshot(),
PaidAt: now,
CreatedAt: now,
UpdatedAt: now,
}
So(o1.Create(ctx), ShouldBeNil)
o2 := &models.Order{
TenantID: tenantID,
UserID: 3,
Type: consts.OrderTypeContentPurchase,
Status: consts.OrderStatusPaid,
Currency: consts.CurrencyCNY,
AmountPaid: 200,
Snapshot: newLegacyOrderSnapshot(),
PaidAt: now,
CreatedAt: now,
UpdatedAt: now,
}
So(o2.Create(ctx), ShouldBeNil)
typ := consts.OrderTypeTopup
pager, err := Order.AdminOrderPage(ctx, tenantID, &dto.AdminOrderListFilter{
Type: &typ,
})
So(err, ShouldBeNil)
So(pager.Total, ShouldEqual, 1)
})
Convey("组合筛选user_id + status + amount_paid 区间 + content_id", func() {
s.truncate(ctx, models.TableNameOrderItem, models.TableNameOrder)
@@ -888,123 +764,6 @@ func (s *OrderTestSuite) Test_AdminOrderExportCSV() {
})
}
func (s *OrderTestSuite) Test_AdminBatchTopupUsers() {
Convey("Order.AdminBatchTopupUsers", s.T(), func() {
ctx := s.T().Context()
now := time.Now().UTC()
tenantID := int64(1)
operatorUserID := int64(10)
s.truncate(
ctx,
models.TableNameTenantLedger,
models.TableNameOrderItem,
models.TableNameOrder,
models.TableNameTenantUser,
)
Convey("参数非法应返回错误", func() {
_, err := Order.AdminBatchTopupUsers(ctx, 0, operatorUserID, &dto.AdminBatchTopupForm{}, now)
So(err, ShouldNotBeNil)
_, err = Order.AdminBatchTopupUsers(ctx, tenantID, 0, &dto.AdminBatchTopupForm{}, now)
So(err, ShouldNotBeNil)
_, err = Order.AdminBatchTopupUsers(ctx, tenantID, operatorUserID, nil, now)
So(err, ShouldNotBeNil)
_, err = Order.AdminBatchTopupUsers(ctx, tenantID, operatorUserID, &dto.AdminBatchTopupForm{BatchIdempotencyKey: ""}, now)
So(err, ShouldNotBeNil)
_, err = Order.AdminBatchTopupUsers(ctx, tenantID, operatorUserID, &dto.AdminBatchTopupForm{BatchIdempotencyKey: "b1", Items: nil}, now)
So(err, ShouldNotBeNil)
})
Convey("超过单批次最大条数应返回错误", func() {
items := make([]*dto.AdminBatchTopupItem, 0, 201)
for i := 0; i < 201; i++ {
items = append(items, &dto.AdminBatchTopupItem{UserID: int64(1000 + i), Amount: 1})
}
_, err := Order.AdminBatchTopupUsers(ctx, tenantID, operatorUserID, &dto.AdminBatchTopupForm{
BatchIdempotencyKey: "too_many",
Items: items,
}, now)
So(err, ShouldNotBeNil)
})
Convey("单条参数不合法应只影响该条并返回错误明细", func() {
s.seedTenantUser(ctx, tenantID, 20, 0, 0)
form := &dto.AdminBatchTopupForm{
BatchIdempotencyKey: "batch_invalid_item",
Items: []*dto.AdminBatchTopupItem{
nil,
{UserID: 0, Amount: 100, Reason: "bad_user_id"},
{UserID: 20, Amount: 0, Reason: "bad_amount"},
{UserID: 20, Amount: 100, Reason: "ok"},
},
}
resp, err := Order.AdminBatchTopupUsers(ctx, tenantID, operatorUserID, form, now)
So(err, ShouldBeNil)
So(resp, ShouldNotBeNil)
So(resp.Total, ShouldEqual, 4)
So(resp.Success, ShouldEqual, 1)
So(resp.Failed, ShouldEqual, 3)
So(len(resp.Items), ShouldEqual, 4)
So(resp.Items[0].OK, ShouldBeFalse)
So(resp.Items[1].OK, ShouldBeFalse)
So(resp.Items[2].OK, ShouldBeFalse)
So(resp.Items[3].OK, ShouldBeTrue)
})
Convey("部分成功应返回明细结果且成功入账", func() {
// seed 2 个成员1 个非成员
s.seedTenantUser(ctx, tenantID, 20, 0, 0)
s.seedTenantUser(ctx, tenantID, 21, 0, 0)
form := &dto.AdminBatchTopupForm{
BatchIdempotencyKey: "batch_001",
Items: []*dto.AdminBatchTopupItem{
{UserID: 20, Amount: 100, Reason: "a"},
{UserID: 999, Amount: 100, Reason: "not_member"},
{UserID: 21, Amount: 200, Reason: "b"},
},
}
resp, err := Order.AdminBatchTopupUsers(ctx, tenantID, operatorUserID, form, now)
So(err, ShouldBeNil)
So(resp, ShouldNotBeNil)
So(resp.Total, ShouldEqual, 3)
So(resp.Success, ShouldEqual, 2)
So(resp.Failed, ShouldEqual, 1)
So(len(resp.Items), ShouldEqual, 3)
So(resp.Items[0].OK, ShouldBeTrue)
So(resp.Items[0].OrderID, ShouldBeGreaterThan, 0)
So(resp.Items[1].OK, ShouldBeFalse)
So(resp.Items[2].OK, ShouldBeTrue)
var tu20 models.TenantUser
So(_db.WithContext(ctx).Where("tenant_id = ? AND user_id = ?", tenantID, int64(20)).First(&tu20).Error, ShouldBeNil)
So(tu20.Balance, ShouldEqual, 100)
var tu21 models.TenantUser
So(_db.WithContext(ctx).Where("tenant_id = ? AND user_id = ?", tenantID, int64(21)).First(&tu21).Error, ShouldBeNil)
So(tu21.Balance, ShouldEqual, 200)
Convey("同一批次重复调用应幂等,不重复入账", func() {
resp2, err := Order.AdminBatchTopupUsers(ctx, tenantID, operatorUserID, form, now.Add(time.Second))
So(err, ShouldBeNil)
So(resp2.Success, ShouldEqual, 2)
var tu20b models.TenantUser
So(_db.WithContext(ctx).Where("tenant_id = ? AND user_id = ?", tenantID, int64(20)).First(&tu20b).Error, ShouldBeNil)
So(tu20b.Balance, ShouldEqual, 100)
var tu21b models.TenantUser
So(_db.WithContext(ctx).Where("tenant_id = ? AND user_id = ?", tenantID, int64(21)).First(&tu21b).Error, ShouldBeNil)
So(tu21b.Balance, ShouldEqual, 200)
})
})
})
}
func (s *OrderTestSuite) Test_AdminRefundOrder() {
Convey("Order.AdminRefundOrder", s.T(), func() {
ctx := s.T().Context()
@@ -1020,6 +779,7 @@ func (s *OrderTestSuite) Test_AdminRefundOrder() {
models.TableNameOrderItem,
models.TableNameOrder,
models.TableNameTenantUser,
models.TableNameUser,
)
Convey("参数非法应返回错误", func() {
@@ -1077,8 +837,8 @@ func (s *OrderTestSuite) Test_AdminRefundOrder() {
So(err, ShouldNotBeNil)
})
Convey("成功退款应回收权益并入账", func() {
s.seedTenantUser(ctx, tenantID, buyerUserID, 0, 0)
Convey("成功退款应回收权益并入账", func() {
s.seedTenantUser(ctx, tenantID, buyerUserID, 0, 0)
contentID := int64(123)
orderModel := &models.Order{
@@ -1122,45 +882,45 @@ func (s *OrderTestSuite) Test_AdminRefundOrder() {
}
So(access.Create(ctx), ShouldBeNil)
refunding, err := Order.AdminRefundOrder(ctx, tenantID, operatorUserID, orderModel.ID, false, "原因", "", now.Add(time.Minute))
So(err, ShouldBeNil)
So(refunding, ShouldNotBeNil)
So(refunding.Status, ShouldEqual, consts.OrderStatusRefunding)
refunding, err := Order.AdminRefundOrder(ctx, tenantID, operatorUserID, orderModel.ID, false, "原因", "", now.Add(time.Minute))
So(err, ShouldBeNil)
So(refunding, ShouldNotBeNil)
So(refunding.Status, ShouldEqual, consts.OrderStatusRefunding)
// refunding 期间重复请求应幂等返回 refunding并允许重复触发入队不影响最终结果
refunding2, err := Order.AdminRefundOrder(ctx, tenantID, operatorUserID, orderModel.ID, false, "原因2", "", now.Add(90*time.Second))
So(err, ShouldBeNil)
So(refunding2, ShouldNotBeNil)
So(refunding2.Status, ShouldEqual, consts.OrderStatusRefunding)
// refunding 期间重复请求应幂等返回 refunding并允许重复触发入队不影响最终结果
refunding2, err := Order.AdminRefundOrder(ctx, tenantID, operatorUserID, orderModel.ID, false, "原因2", "", now.Add(90*time.Second))
So(err, ShouldBeNil)
So(refunding2, ShouldNotBeNil)
So(refunding2.Status, ShouldEqual, consts.OrderStatusRefunding)
refunded, err := Order.ProcessRefundingOrder(ctx, &ProcessRefundingOrderParams{
TenantID: tenantID,
OrderID: orderModel.ID,
OperatorUserID: operatorUserID,
Force: false,
Reason: "原因",
Now: now.Add(2 * time.Minute),
})
So(err, ShouldBeNil)
So(refunded, ShouldNotBeNil)
So(refunded.Status, ShouldEqual, consts.OrderStatusRefunded)
refunded, err := Order.ProcessRefundingOrder(ctx, &ProcessRefundingOrderParams{
TenantID: tenantID,
OrderID: orderModel.ID,
OperatorUserID: operatorUserID,
Force: false,
Reason: "原因",
Now: now.Add(2 * time.Minute),
})
So(err, ShouldBeNil)
So(refunded, ShouldNotBeNil)
So(refunded.Status, ShouldEqual, consts.OrderStatusRefunded)
// worker 重试/重复执行应幂等:不重复入账、不重复回收权益。
refundedRetry, err := Order.ProcessRefundingOrder(ctx, &ProcessRefundingOrderParams{
TenantID: tenantID,
OrderID: orderModel.ID,
OperatorUserID: operatorUserID,
Force: false,
Reason: "原因",
Now: now.Add(5 * time.Minute),
})
So(err, ShouldBeNil)
So(refundedRetry, ShouldNotBeNil)
So(refundedRetry.Status, ShouldEqual, consts.OrderStatusRefunded)
// worker 重试/重复执行应幂等:不重复入账、不重复回收权益。
refundedRetry, err := Order.ProcessRefundingOrder(ctx, &ProcessRefundingOrderParams{
TenantID: tenantID,
OrderID: orderModel.ID,
OperatorUserID: operatorUserID,
Force: false,
Reason: "原因",
Now: now.Add(5 * time.Minute),
})
So(err, ShouldBeNil)
So(refundedRetry, ShouldNotBeNil)
So(refundedRetry.Status, ShouldEqual, consts.OrderStatusRefunded)
var tu models.TenantUser
So(_db.WithContext(ctx).Where("tenant_id = ? AND user_id = ?", tenantID, buyerUserID).First(&tu).Error, ShouldBeNil)
So(tu.Balance, ShouldEqual, 300)
var u models.User
So(_db.WithContext(ctx).Where("id = ?", buyerUserID).First(&u).Error, ShouldBeNil)
So(u.Balance, ShouldEqual, 300)
var access2 models.ContentAccess
So(_db.WithContext(ctx).Where("tenant_id = ? AND user_id = ? AND content_id = ?", tenantID, buyerUserID, contentID).First(&access2).Error, ShouldBeNil)
@@ -1171,31 +931,31 @@ func (s *OrderTestSuite) Test_AdminRefundOrder() {
So(err, ShouldBeNil)
So(refunded2.Status, ShouldEqual, consts.OrderStatusRefunded)
var tu2 models.TenantUser
So(_db.WithContext(ctx).Where("tenant_id = ? AND user_id = ?", tenantID, buyerUserID).First(&tu2).Error, ShouldBeNil)
So(tu2.Balance, ShouldEqual, 300)
var u2 models.User
So(_db.WithContext(ctx).Where("id = ?", buyerUserID).First(&u2).Error, ShouldBeNil)
So(u2.Balance, ShouldEqual, 300)
var ledgers []*models.TenantLedger
So(_db.WithContext(ctx).
Where("tenant_id = ? AND user_id = ? AND idempotency_key = ?", tenantID, buyerUserID, fmt.Sprintf("refund:%d", orderModel.ID)).
Find(&ledgers).Error, ShouldBeNil)
So(len(ledgers), ShouldEqual, 1)
})
Convey("不可重试错误分类应稳定", func() {
So(IsRefundJobNonRetryableError(nil), ShouldBeFalse)
So(IsRefundJobNonRetryableError(errors.New("x")), ShouldBeFalse)
So(IsRefundJobNonRetryableError(errorx.ErrInvalidParameter), ShouldBeTrue)
So(IsRefundJobNonRetryableError(errorx.ErrRecordNotFound), ShouldBeTrue)
So(IsRefundJobNonRetryableError(errorx.ErrStatusConflict), ShouldBeTrue)
So(IsRefundJobNonRetryableError(errorx.ErrPreconditionFailed), ShouldBeTrue)
So(IsRefundJobNonRetryableError(errorx.ErrPermissionDenied), ShouldBeTrue)
So(IsRefundJobNonRetryableError(errorx.ErrInternalError), ShouldBeFalse)
})
So(_db.WithContext(ctx).
Where("tenant_id = ? AND user_id = ? AND idempotency_key = ?", tenantID, buyerUserID, fmt.Sprintf("refund:%d", orderModel.ID)).
Find(&ledgers).Error, ShouldBeNil)
So(len(ledgers), ShouldEqual, 1)
})
}
Convey("不可重试错误分类应稳定", func() {
So(IsRefundJobNonRetryableError(nil), ShouldBeFalse)
So(IsRefundJobNonRetryableError(errors.New("x")), ShouldBeFalse)
So(IsRefundJobNonRetryableError(errorx.ErrInvalidParameter), ShouldBeTrue)
So(IsRefundJobNonRetryableError(errorx.ErrRecordNotFound), ShouldBeTrue)
So(IsRefundJobNonRetryableError(errorx.ErrStatusConflict), ShouldBeTrue)
So(IsRefundJobNonRetryableError(errorx.ErrPreconditionFailed), ShouldBeTrue)
So(IsRefundJobNonRetryableError(errorx.ErrPermissionDenied), ShouldBeTrue)
So(IsRefundJobNonRetryableError(errorx.ErrInternalError), ShouldBeFalse)
})
})
}
func (s *OrderTestSuite) Test_PurchaseContent() {
Convey("Order.PurchaseContent", s.T(), func() {
@@ -1302,6 +1062,7 @@ func (s *OrderTestSuite) Test_PurchaseContent() {
models.TableNameContentPrice,
models.TableNameContent,
models.TableNameTenantUser,
models.TableNameUser,
)
s.seedTenantUser(ctx, tenantID, buyerUserID, 1000, 0)
content := s.seedPublishedContent(ctx, tenantID, ownerUserID)
@@ -1335,10 +1096,10 @@ func (s *OrderTestSuite) Test_PurchaseContent() {
So(itemSnap.ContentID, ShouldEqual, content.ID)
So(itemSnap.AmountPaid, ShouldEqual, int64(300))
var tu models.TenantUser
So(_db.WithContext(ctx).Where("tenant_id = ? AND user_id = ?", tenantID, buyerUserID).First(&tu).Error, ShouldBeNil)
So(tu.Balance, ShouldEqual, 700)
So(tu.BalanceFrozen, ShouldEqual, 0)
var u models.User
So(_db.WithContext(ctx).Where("id = ?", buyerUserID).First(&u).Error, ShouldBeNil)
So(u.Balance, ShouldEqual, 700)
So(u.BalanceFrozen, ShouldEqual, 0)
res2, err := Order.PurchaseContent(ctx, &PurchaseContentParams{
TenantID: tenantID,
@@ -1350,10 +1111,10 @@ func (s *OrderTestSuite) Test_PurchaseContent() {
So(err, ShouldBeNil)
So(res2.Order.ID, ShouldEqual, res1.Order.ID)
var tu2 models.TenantUser
So(_db.WithContext(ctx).Where("tenant_id = ? AND user_id = ?", tenantID, buyerUserID).First(&tu2).Error, ShouldBeNil)
So(tu2.Balance, ShouldEqual, 700)
So(tu2.BalanceFrozen, ShouldEqual, 0)
var u2 models.User
So(_db.WithContext(ctx).Where("id = ?", buyerUserID).First(&u2).Error, ShouldBeNil)
So(u2.Balance, ShouldEqual, 700)
So(u2.BalanceFrozen, ShouldEqual, 0)
})
Convey("存在回滚标记时应稳定返回“失败+已回滚”", func() {
@@ -1366,6 +1127,7 @@ func (s *OrderTestSuite) Test_PurchaseContent() {
models.TableNameContentPrice,
models.TableNameContent,
models.TableNameTenantUser,
models.TableNameUser,
)
s.seedTenantUser(ctx, tenantID, buyerUserID, 1000, 0)
content := s.seedPublishedContent(ctx, tenantID, ownerUserID)

View File

@@ -278,20 +278,23 @@ func (t *tenant) TenantUserBalanceMapping(ctx context.Context, tenantIds []int64
return result, nil
}
tbl, query := models.TenantUserQuery.QueryContext(ctx)
var items []struct {
TenantID int64
Balance int64
}
err := query.
Select(
tbl.TenantID,
tbl.Balance.Sum().As("balance"),
).
Where(tbl.TenantID.In(tenantIds...)).
Group(tbl.TenantID).
Scan(&items)
// 全局余额:按租户维度统计“该租户成员的 users.balance 之和”。
// 注意:用户可能加入多个租户,因此不同租户的统计会出现重复计入(这符合“按租户视角”统计的直觉)。
err := models.Q.TenantUser.
WithContext(ctx).
UnderlyingDB().
Table(models.TableNameTenantUser+" tu").
Select("tu.tenant_id, COALESCE(SUM(u.balance), 0) AS balance").
Joins("JOIN "+models.TableNameUser+" u ON u.id = tu.user_id AND u.deleted_at IS NULL").
Where("tu.tenant_id IN ?", tenantIds).
Group("tu.tenant_id").
Scan(&items).
Error
if err != nil {
return nil, err
}

View File

@@ -265,14 +265,12 @@ func (t *tenant) JoinByInvite(ctx context.Context, tenantID, userID int64, invit
// 加入租户:默认 member + verified与 tenant.AddUser 保持一致。
tu := &models.TenantUser{
TenantID: tenantID,
UserID: userID,
Role: types.NewArray([]consts.TenantUserRole{consts.TenantUserRoleMember}),
Status: consts.UserStatusVerified,
Balance: 0,
BalanceFrozen: 0,
CreatedAt: now,
UpdatedAt: now,
TenantID: tenantID,
UserID: userID,
Role: types.NewArray([]consts.TenantUserRole{consts.TenantUserRoleMember}),
Status: consts.UserStatusVerified,
CreatedAt: now,
UpdatedAt: now,
}
if err := tx.Create(tu).Error; err != nil {
if isUniqueViolation(err) {
@@ -430,14 +428,12 @@ func (t *tenant) AdminApproveJoinRequest(ctx context.Context, tenantID, operator
// 先落成员关系,再更新申请状态,保证“通过后一定能成为成员”(至少幂等)。
tu := &models.TenantUser{
TenantID: tenantID,
UserID: req.UserID,
Role: types.NewArray([]consts.TenantUserRole{consts.TenantUserRoleMember}),
Status: consts.UserStatusVerified,
Balance: 0,
BalanceFrozen: 0,
CreatedAt: now,
UpdatedAt: now,
TenantID: tenantID,
UserID: req.UserID,
Role: types.NewArray([]consts.TenantUserRole{consts.TenantUserRoleMember}),
Status: consts.UserStatusVerified,
CreatedAt: now,
UpdatedAt: now,
}
if err := tx.Create(tu).Error; err != nil && !isUniqueViolation(err) {
return err