feat: 添加订单退款处理的异步任务及相关逻辑

This commit is contained in:
2025-12-22 23:03:18 +08:00
parent 56256a6fb8
commit ead821ac2c
7 changed files with 337 additions and 35 deletions

View File

@@ -9,6 +9,8 @@ import (
"quyun/v2/app/commands"
"quyun/v2/app/jobs"
"quyun/v2/app/services"
"quyun/v2/database"
"quyun/v2/providers/app"
"quyun/v2/providers/job"
"quyun/v2/providers/postgres"
@@ -22,6 +24,7 @@ func defaultProviders() container.Providers {
return commands.Default(container.Providers{
postgres.DefaultProvider(),
job.DefaultProvider(),
database.DefaultProvider(),
}...)
}
@@ -34,6 +37,7 @@ func Command() atom.Option {
defaultProviders().
With(
jobs.Provide,
services.Provide,
),
),
)

View File

@@ -0,0 +1,45 @@
package args
import (
"quyun/v2/providers/job"
. "github.com/riverqueue/river"
"go.ipao.vip/atom/contracts"
)
var _ contracts.JobArgs = OrderRefundJob{}
// OrderRefundJob 表示“订单退款处理”的一次性异步任务。
//
// 设计说明:
// - 该任务用于将订单从 refunding 推进到 refunded/failed由 worker 执行核心退款逻辑)。
// - 幂等由 River Unique + 业务侧 DB 幂等ledger idempotency、状态机共同保证。
type OrderRefundJob struct {
// TenantID 租户ID用于多租户隔离与唯一性维度
TenantID int64 `json:"tenant_id" river:"unique"`
// OrderID 订单ID用于唯一性维度
OrderID int64 `json:"order_id" river:"unique"`
// OperatorUserID 退款操作人用户ID租户管理员/系统),用于 worker 在必要时回填审计字段。
OperatorUserID int64 `json:"operator_user_id"`
// Force 是否强制退款(绕过时间窗)。
Force bool `json:"force"`
// Reason 退款原因用于审计worker 可用于补齐订单字段)。
Reason string `json:"reason"`
}
func (OrderRefundJob) Kind() string { return "order_refund" }
func (a OrderRefundJob) UniqueID() string { return a.Kind() }
func (OrderRefundJob) InsertOpts() InsertOpts {
return InsertOpts{
Queue: job.QueueDefault,
Priority: job.PriorityDefault,
// 失败可重试;由 worker 判断不可重试的场景并 JobCancel。
MaxAttempts: 10,
UniqueOpts: UniqueOpts{
ByArgs: true,
},
}
}

View File

@@ -0,0 +1,58 @@
package jobs
import (
"context"
"time"
jobs_args "quyun/v2/app/jobs/args"
"quyun/v2/app/services"
. "github.com/riverqueue/river"
log "github.com/sirupsen/logrus"
)
var _ Worker[jobs_args.OrderRefundJob] = (*OrderRefundJobWorker)(nil)
// OrderRefundJobWorker 负责执行订单退款的异步处理。
//
// @provider(job)
type OrderRefundJobWorker struct {
WorkerDefaults[jobs_args.OrderRefundJob]
}
func (w *OrderRefundJobWorker) Work(ctx context.Context, job *Job[jobs_args.OrderRefundJob]) error {
args := job.Args
logger := log.WithFields(log.Fields{
"job_kind": args.Kind(),
"tenant_id": args.TenantID,
"order_id": args.OrderID,
"operator_user_id": args.OperatorUserID,
"force": args.Force,
"attempt": job.Attempt,
})
// 只允许在异步 worker 层做执行,不要在这里再次入队其他任务(避免耦合与递归依赖)。
logger.Info("jobs.order_refund.start")
_, err := services.Order.ProcessRefundingOrder(ctx, &services.ProcessRefundingOrderParams{
TenantID: args.TenantID,
OrderID: args.OrderID,
OperatorUserID: args.OperatorUserID,
Force: args.Force,
Reason: args.Reason,
Now: time.Now().UTC(),
})
if err != nil {
// 业务层会返回可识别的“不可重试”错误由它内部完成状态落库failed这里直接 cancel。
if services.IsRefundJobNonRetryableError(err) {
logger.WithError(err).Warn("jobs.order_refund.cancel")
return JobCancel(err)
}
logger.WithError(err).Warn("jobs.order_refund.retry")
return err
}
logger.Info("jobs.order_refund.ok")
return nil
}

View File

@@ -37,5 +37,17 @@ func Provide(opts ...opt.Option) error {
}, atom.GroupInitial); err != nil {
return err
}
if err := container.Container.Provide(func(
__job *job.Job,
) (contracts.Initial, error) {
obj := &OrderRefundJobWorker{}
if err := river.AddWorkerSafely(__job.Workers, obj); err != nil {
return nil, err
}
return obj, nil
}, atom.GroupInitial); err != nil {
return err
}
return nil
}

View File

@@ -12,11 +12,13 @@ import (
"quyun/v2/app/errorx"
"quyun/v2/app/http/tenant/dto"
jobs_args "quyun/v2/app/jobs/args"
"quyun/v2/app/requests"
"quyun/v2/database"
"quyun/v2/database/fields"
"quyun/v2/database/models"
"quyun/v2/pkg/consts"
provider_job "quyun/v2/providers/job"
pkgerrors "github.com/pkg/errors"
"github.com/samber/lo"
@@ -41,7 +43,11 @@ func newOrderSnapshot(kind consts.OrderType, payload any) types.JSONType[fields.
}
// AdminOrderExportCSV 租户管理员导出订单列表CSV 文本)。
func (s *order) AdminOrderExportCSV(ctx context.Context, tenantID int64, filter *dto.AdminOrderListFilter) (*dto.AdminOrderExportResponse, error) {
func (s *order) AdminOrderExportCSV(
ctx context.Context,
tenantID int64,
filter *dto.AdminOrderListFilter,
) (*dto.AdminOrderExportResponse, error) {
if tenantID <= 0 {
return nil, errorx.ErrInvalidParameter.WithMsg("tenant_id must be > 0")
}
@@ -349,6 +355,43 @@ type PurchaseContentResult struct {
type order struct {
db *gorm.DB
ledger *ledger
job *provider_job.Job
}
type ProcessRefundingOrderParams struct {
// TenantID 租户ID。
TenantID int64
// OrderID 订单ID。
OrderID int64
// OperatorUserID 退款操作人用户ID用于补齐订单审计字段
OperatorUserID int64
// Force 是否强制退款(用于补齐订单审计字段)。
Force bool
// Reason 退款原因(用于补齐订单审计字段)。
Reason string
// Now 逻辑时间(用于 refunded_at/updated_at
Now time.Time
}
func IsRefundJobNonRetryableError(err error) bool {
var appErr *errorx.AppError
if !errors.As(err, &appErr) {
return false
}
switch appErr.Code {
case errorx.CodeInvalidParameter,
errorx.CodeRecordNotFound,
errorx.CodeStatusConflict,
errorx.CodePreconditionFailed,
errorx.CodePermissionDenied:
return true
default:
return false
}
}
func (s *order) enqueueOrderRefundJob(args jobs_args.OrderRefundJob) error {
return s.job.Add(args)
}
// AdminTopupUser 租户管理员给租户成员充值(增加该租户下的可用余额)。
@@ -747,11 +790,15 @@ func (s *order) AdminRefundOrder(
return err
}
// 状态机:已退款直接幂等返回;仅允许已支付订单退款
// 状态机:已退款/退款中直接幂等返回;仅允许已支付订单发起退款请求
if orderModel.Status == consts.OrderStatusRefunded {
out = &orderModel
return nil
}
if orderModel.Status == consts.OrderStatusRefunding {
out = &orderModel
return nil
}
if orderModel.Status != consts.OrderStatusPaid {
return errorx.ErrStatusConflict.WithMsg("订单非已支付状态,无法退款")
}
@@ -767,38 +814,11 @@ func (s *order) AdminRefundOrder(
}
}
amount := orderModel.AmountPaid
refundKey := fmt.Sprintf("refund:%d", orderModel.ID)
// 先退余额(账本入账),后更新订单状态与权益,确保退款可对账且可追溯。
if amount > 0 {
if _, err := s.ledger.CreditRefundTx(ctx, tx, tenantID, operatorUserID, orderModel.UserID, orderModel.ID, amount, refundKey, reason, now); err != nil {
return err
}
}
// 退款对权益:立即回收 content_accessrevoked
for _, item := range orderModel.Items {
if item == nil {
continue
}
if err := tx.Table(models.TableNameContentAccess).
Where("tenant_id = ? AND user_id = ? AND content_id = ?", tenantID, orderModel.UserID, item.ContentID).
Updates(map[string]any{
"status": consts.ContentAccessStatusRevoked,
"revoked_at": now,
"updated_at": now,
}).Error; err != nil {
return err
}
}
// 最后更新订单退款字段,保证退款后的最终状态一致。
// 将订单推进到 refunding并记录本次请求的审计字段实际退款逻辑由异步 job 完成。
if err := tx.Table(models.TableNameOrder).
Where("id = ?", orderModel.ID).
Updates(map[string]any{
"status": consts.OrderStatusRefunded,
"refunded_at": now,
"status": consts.OrderStatusRefunding,
"refund_forced": force,
"refund_operator_user_id": operatorUserID,
"refund_reason": reason,
@@ -807,8 +827,7 @@ func (s *order) AdminRefundOrder(
return err
}
orderModel.Status = consts.OrderStatusRefunded
orderModel.RefundedAt = now
orderModel.Status = consts.OrderStatusRefunding
orderModel.RefundForced = force
orderModel.RefundOperatorUserID = operatorUserID
orderModel.RefundReason = reason
@@ -828,6 +847,30 @@ func (s *order) AdminRefundOrder(
return nil, err
}
// refunding 状态需要确保异步任务已入队:入队失败则返回错误,调用方可重试(幂等)。
if out != nil && out.Status == consts.OrderStatusRefunding {
err := s.enqueueOrderRefundJob(jobs_args.OrderRefundJob{
TenantID: tenantID,
OrderID: out.ID,
OperatorUserID: operatorUserID,
Force: force,
Reason: reason,
})
if err != nil {
logrus.WithFields(logrus.Fields{
"tenant_id": tenantID,
"operator_user_id": operatorUserID,
"order_id": out.ID,
}).WithError(err).Warn("services.order.admin.refund.enqueue_failed")
return nil, err
}
logrus.WithFields(logrus.Fields{
"tenant_id": tenantID,
"operator_user_id": operatorUserID,
"order_id": out.ID,
}).Info("services.order.admin.refund.enqueued")
}
logrus.WithFields(logrus.Fields{
"tenant_id": tenantID,
"operator_user_id": operatorUserID,
@@ -839,6 +882,131 @@ func (s *order) AdminRefundOrder(
return out, nil
}
// ProcessRefundingOrder 处理 refunding 订单:退余额、回收权益、推进到 refunded。
// 供异步 job/worker 调用;需保持幂等。
func (s *order) ProcessRefundingOrder(ctx context.Context, params *ProcessRefundingOrderParams) (*models.Order, error) {
if params == nil {
return nil, errorx.ErrInvalidParameter.WithMsg("params is required")
}
if params.TenantID <= 0 || params.OrderID <= 0 {
return nil, errorx.ErrInvalidParameter.WithMsg("tenant_id/order_id must be > 0")
}
if params.Now.IsZero() {
params.Now = time.Now()
}
logrus.WithFields(logrus.Fields{
"tenant_id": params.TenantID,
"order_id": params.OrderID,
"operator_user_id": params.OperatorUserID,
"force": params.Force,
}).Info("services.order.refund.process")
var out *models.Order
err := s.db.WithContext(ctx).Transaction(func(tx *gorm.DB) error {
var orderModel models.Order
if err := tx.
Clauses(clause.Locking{Strength: "UPDATE"}).
Preload("Items").
Where("tenant_id = ? AND id = ?", params.TenantID, params.OrderID).
First(&orderModel).Error; err != nil {
if errors.Is(err, gorm.ErrRecordNotFound) {
return errorx.ErrRecordNotFound.WithMsg("order not found")
}
return err
}
// 幂等:已退款/已失败直接返回。
if orderModel.Status == consts.OrderStatusRefunded || orderModel.Status == consts.OrderStatusFailed {
out = &orderModel
return nil
}
// 仅允许 refunding 状态进入处理链路paid->refunding 必须由接口层完成并记录审计字段。
if orderModel.Status != consts.OrderStatusRefunding {
// 不可重试:状态不符合预期,直接标记 failed避免 job 无限重试。
_ = tx.Table(models.TableNameOrder).
Where("id = ?", orderModel.ID).
Updates(map[string]any{
"status": consts.OrderStatusFailed,
"updated_at": params.Now,
}).Error
return errorx.ErrStatusConflict.WithMsg("order not in refunding status")
}
// 补齐审计字段:以订单字段为准;若为空则用 job 参数兜底(避免历史数据/异常路径导致缺失)。
operatorUserID := orderModel.RefundOperatorUserID
if operatorUserID == 0 {
operatorUserID = params.OperatorUserID
}
reason := orderModel.RefundReason
if strings.TrimSpace(reason) == "" {
reason = strings.TrimSpace(params.Reason)
}
force := orderModel.RefundForced
if !force {
force = params.Force
}
amount := orderModel.AmountPaid
refundKey := fmt.Sprintf("refund:%d", orderModel.ID)
// 先退余额(账本入账),后回收权益,最后推进订单终态:保证退款可对账且可追溯。
if amount > 0 {
if _, err := s.ledger.CreditRefundTx(ctx, tx, params.TenantID, operatorUserID, orderModel.UserID, orderModel.ID, amount, refundKey, reason, params.Now); err != nil {
return err
}
}
for _, item := range orderModel.Items {
if item == nil {
continue
}
if err := tx.Table(models.TableNameContentAccess).
Where("tenant_id = ? AND user_id = ? AND content_id = ?", params.TenantID, orderModel.UserID, item.ContentID).
Updates(map[string]any{
"status": consts.ContentAccessStatusRevoked,
"revoked_at": params.Now,
"updated_at": params.Now,
}).Error; err != nil {
return err
}
}
if err := tx.Table(models.TableNameOrder).
Where("id = ?", orderModel.ID).
Updates(map[string]any{
"status": consts.OrderStatusRefunded,
"refunded_at": params.Now,
"refund_forced": force,
"refund_operator_user_id": operatorUserID,
"refund_reason": reason,
"updated_at": params.Now,
}).Error; err != nil {
return err
}
orderModel.Status = consts.OrderStatusRefunded
orderModel.RefundedAt = params.Now
orderModel.RefundForced = force
orderModel.RefundOperatorUserID = operatorUserID
orderModel.RefundReason = reason
orderModel.UpdatedAt = params.Now
out = &orderModel
return nil
})
if err != nil {
// 不可重试错误由 worker 负责 JobCancel这里保持原始 error 以便判定。
logrus.WithFields(logrus.Fields{
"tenant_id": params.TenantID,
"order_id": params.OrderID,
}).WithError(err).Warn("services.order.refund.process.failed")
return nil, err
}
return out, nil
}
func (s *order) PurchaseContent(ctx context.Context, params *PurchaseContentParams) (*PurchaseContentResult, error) {
if params == nil {
return nil, errorx.ErrInvalidParameter.WithMsg("params is required")

View File

@@ -1122,7 +1122,19 @@ func (s *OrderTestSuite) Test_AdminRefundOrder() {
}
So(access.Create(ctx), ShouldBeNil)
refunded, err := Order.AdminRefundOrder(ctx, tenantID, operatorUserID, orderModel.ID, false, "原因", "", now.Add(time.Minute))
refunding, err := Order.AdminRefundOrder(ctx, tenantID, operatorUserID, orderModel.ID, false, "原因", "", now.Add(time.Minute))
So(err, ShouldBeNil)
So(refunding, ShouldNotBeNil)
So(refunding.Status, ShouldEqual, consts.OrderStatusRefunding)
refunded, err := Order.ProcessRefundingOrder(ctx, &ProcessRefundingOrderParams{
TenantID: tenantID,
OrderID: orderModel.ID,
OperatorUserID: operatorUserID,
Force: false,
Reason: "原因",
Now: now.Add(2 * time.Minute),
})
So(err, ShouldBeNil)
So(refunded, ShouldNotBeNil)
So(refunded.Status, ShouldEqual, consts.OrderStatusRefunded)
@@ -1136,7 +1148,7 @@ func (s *OrderTestSuite) Test_AdminRefundOrder() {
So(access2.Status, ShouldEqual, consts.ContentAccessStatusRevoked)
So(access2.RevokedAt.IsZero(), ShouldBeFalse)
refunded2, err := Order.AdminRefundOrder(ctx, tenantID, operatorUserID, orderModel.ID, false, "原因2", "", now.Add(2*time.Minute))
refunded2, err := Order.AdminRefundOrder(ctx, tenantID, operatorUserID, orderModel.ID, false, "原因2", "", now.Add(3*time.Minute))
So(err, ShouldBeNil)
So(refunded2.Status, ShouldEqual, consts.OrderStatusRefunded)

View File

@@ -1,6 +1,7 @@
package services
import (
provider_job "quyun/v2/providers/job"
provider_jwt "quyun/v2/providers/jwt"
"go.ipao.vip/atom"
@@ -49,10 +50,12 @@ func Provide(opts ...opt.Option) error {
}
if err := container.Container.Provide(func(
db *gorm.DB,
job *provider_job.Job,
ledger *ledger,
) (*order, error) {
obj := &order{
db: db,
job: job,
ledger: ledger,
}