From ead821ac2cfb66aec348aba2706fd61cdf163c49 Mon Sep 17 00:00:00 2001 From: Rogee Date: Mon, 22 Dec 2025 23:03:18 +0800 Subject: [PATCH] =?UTF-8?q?feat:=20=E6=B7=BB=E5=8A=A0=E8=AE=A2=E5=8D=95?= =?UTF-8?q?=E9=80=80=E6=AC=BE=E5=A4=84=E7=90=86=E7=9A=84=E5=BC=82=E6=AD=A5?= =?UTF-8?q?=E4=BB=BB=E5=8A=A1=E5=8F=8A=E7=9B=B8=E5=85=B3=E9=80=BB=E8=BE=91?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- backend/app/commands/queue/river.go | 4 + backend/app/jobs/args/order_refund.go | 45 +++++ backend/app/jobs/order_refund.go | 58 +++++++ backend/app/jobs/provider.gen.go | 12 ++ backend/app/services/order.go | 234 ++++++++++++++++++++++---- backend/app/services/order_test.go | 16 +- backend/app/services/provider.gen.go | 3 + 7 files changed, 337 insertions(+), 35 deletions(-) create mode 100644 backend/app/jobs/args/order_refund.go create mode 100644 backend/app/jobs/order_refund.go diff --git a/backend/app/commands/queue/river.go b/backend/app/commands/queue/river.go index 2365127..e9df549 100644 --- a/backend/app/commands/queue/river.go +++ b/backend/app/commands/queue/river.go @@ -9,6 +9,8 @@ import ( "quyun/v2/app/commands" "quyun/v2/app/jobs" + "quyun/v2/app/services" + "quyun/v2/database" "quyun/v2/providers/app" "quyun/v2/providers/job" "quyun/v2/providers/postgres" @@ -22,6 +24,7 @@ func defaultProviders() container.Providers { return commands.Default(container.Providers{ postgres.DefaultProvider(), job.DefaultProvider(), + database.DefaultProvider(), }...) } @@ -34,6 +37,7 @@ func Command() atom.Option { defaultProviders(). With( jobs.Provide, + services.Provide, ), ), ) diff --git a/backend/app/jobs/args/order_refund.go b/backend/app/jobs/args/order_refund.go new file mode 100644 index 0000000..2aa228d --- /dev/null +++ b/backend/app/jobs/args/order_refund.go @@ -0,0 +1,45 @@ +package args + +import ( + "quyun/v2/providers/job" + + . "github.com/riverqueue/river" + "go.ipao.vip/atom/contracts" +) + +var _ contracts.JobArgs = OrderRefundJob{} + +// OrderRefundJob 表示“订单退款处理”的一次性异步任务。 +// +// 设计说明: +// - 该任务用于将订单从 refunding 推进到 refunded/failed(由 worker 执行核心退款逻辑)。 +// - 幂等由 River Unique + 业务侧 DB 幂等(ledger idempotency、状态机)共同保证。 +type OrderRefundJob struct { + // TenantID 租户ID(用于多租户隔离与唯一性维度)。 + TenantID int64 `json:"tenant_id" river:"unique"` + // OrderID 订单ID(用于唯一性维度)。 + OrderID int64 `json:"order_id" river:"unique"` + + // OperatorUserID 退款操作人用户ID(租户管理员/系统),用于 worker 在必要时回填审计字段。 + OperatorUserID int64 `json:"operator_user_id"` + // Force 是否强制退款(绕过时间窗)。 + Force bool `json:"force"` + // Reason 退款原因(用于审计;worker 可用于补齐订单字段)。 + Reason string `json:"reason"` +} + +func (OrderRefundJob) Kind() string { return "order_refund" } + +func (a OrderRefundJob) UniqueID() string { return a.Kind() } + +func (OrderRefundJob) InsertOpts() InsertOpts { + return InsertOpts{ + Queue: job.QueueDefault, + Priority: job.PriorityDefault, + // 失败可重试;由 worker 判断不可重试的场景并 JobCancel。 + MaxAttempts: 10, + UniqueOpts: UniqueOpts{ + ByArgs: true, + }, + } +} diff --git a/backend/app/jobs/order_refund.go b/backend/app/jobs/order_refund.go new file mode 100644 index 0000000..72b010c --- /dev/null +++ b/backend/app/jobs/order_refund.go @@ -0,0 +1,58 @@ +package jobs + +import ( + "context" + "time" + + jobs_args "quyun/v2/app/jobs/args" + "quyun/v2/app/services" + + . "github.com/riverqueue/river" + log "github.com/sirupsen/logrus" +) + +var _ Worker[jobs_args.OrderRefundJob] = (*OrderRefundJobWorker)(nil) + +// OrderRefundJobWorker 负责执行订单退款的异步处理。 +// +// @provider(job) +type OrderRefundJobWorker struct { + WorkerDefaults[jobs_args.OrderRefundJob] +} + +func (w *OrderRefundJobWorker) Work(ctx context.Context, job *Job[jobs_args.OrderRefundJob]) error { + args := job.Args + + logger := log.WithFields(log.Fields{ + "job_kind": args.Kind(), + "tenant_id": args.TenantID, + "order_id": args.OrderID, + "operator_user_id": args.OperatorUserID, + "force": args.Force, + "attempt": job.Attempt, + }) + + // 只允许在异步 worker 层做执行,不要在这里再次入队其他任务(避免耦合与递归依赖)。 + logger.Info("jobs.order_refund.start") + + _, err := services.Order.ProcessRefundingOrder(ctx, &services.ProcessRefundingOrderParams{ + TenantID: args.TenantID, + OrderID: args.OrderID, + OperatorUserID: args.OperatorUserID, + Force: args.Force, + Reason: args.Reason, + Now: time.Now().UTC(), + }) + if err != nil { + // 业务层会返回可识别的“不可重试”错误:由它内部完成状态落库(failed)后,这里直接 cancel。 + if services.IsRefundJobNonRetryableError(err) { + logger.WithError(err).Warn("jobs.order_refund.cancel") + return JobCancel(err) + } + logger.WithError(err).Warn("jobs.order_refund.retry") + return err + } + + logger.Info("jobs.order_refund.ok") + return nil +} diff --git a/backend/app/jobs/provider.gen.go b/backend/app/jobs/provider.gen.go index 95331e5..e374698 100755 --- a/backend/app/jobs/provider.gen.go +++ b/backend/app/jobs/provider.gen.go @@ -37,5 +37,17 @@ func Provide(opts ...opt.Option) error { }, atom.GroupInitial); err != nil { return err } + if err := container.Container.Provide(func( + __job *job.Job, + ) (contracts.Initial, error) { + obj := &OrderRefundJobWorker{} + if err := river.AddWorkerSafely(__job.Workers, obj); err != nil { + return nil, err + } + + return obj, nil + }, atom.GroupInitial); err != nil { + return err + } return nil } diff --git a/backend/app/services/order.go b/backend/app/services/order.go index 5ebe0ed..5dab87f 100644 --- a/backend/app/services/order.go +++ b/backend/app/services/order.go @@ -12,11 +12,13 @@ import ( "quyun/v2/app/errorx" "quyun/v2/app/http/tenant/dto" + jobs_args "quyun/v2/app/jobs/args" "quyun/v2/app/requests" "quyun/v2/database" "quyun/v2/database/fields" "quyun/v2/database/models" "quyun/v2/pkg/consts" + provider_job "quyun/v2/providers/job" pkgerrors "github.com/pkg/errors" "github.com/samber/lo" @@ -41,7 +43,11 @@ func newOrderSnapshot(kind consts.OrderType, payload any) types.JSONType[fields. } // AdminOrderExportCSV 租户管理员导出订单列表(CSV 文本)。 -func (s *order) AdminOrderExportCSV(ctx context.Context, tenantID int64, filter *dto.AdminOrderListFilter) (*dto.AdminOrderExportResponse, error) { +func (s *order) AdminOrderExportCSV( + ctx context.Context, + tenantID int64, + filter *dto.AdminOrderListFilter, +) (*dto.AdminOrderExportResponse, error) { if tenantID <= 0 { return nil, errorx.ErrInvalidParameter.WithMsg("tenant_id must be > 0") } @@ -349,6 +355,43 @@ type PurchaseContentResult struct { type order struct { db *gorm.DB ledger *ledger + job *provider_job.Job +} + +type ProcessRefundingOrderParams struct { + // TenantID 租户ID。 + TenantID int64 + // OrderID 订单ID。 + OrderID int64 + // OperatorUserID 退款操作人用户ID(用于补齐订单审计字段)。 + OperatorUserID int64 + // Force 是否强制退款(用于补齐订单审计字段)。 + Force bool + // Reason 退款原因(用于补齐订单审计字段)。 + Reason string + // Now 逻辑时间(用于 refunded_at/updated_at)。 + Now time.Time +} + +func IsRefundJobNonRetryableError(err error) bool { + var appErr *errorx.AppError + if !errors.As(err, &appErr) { + return false + } + switch appErr.Code { + case errorx.CodeInvalidParameter, + errorx.CodeRecordNotFound, + errorx.CodeStatusConflict, + errorx.CodePreconditionFailed, + errorx.CodePermissionDenied: + return true + default: + return false + } +} + +func (s *order) enqueueOrderRefundJob(args jobs_args.OrderRefundJob) error { + return s.job.Add(args) } // AdminTopupUser 租户管理员给租户成员充值(增加该租户下的可用余额)。 @@ -747,11 +790,15 @@ func (s *order) AdminRefundOrder( return err } - // 状态机:已退款直接幂等返回;仅允许已支付订单退款。 + // 状态机:已退款/退款中直接幂等返回;仅允许已支付订单发起退款请求。 if orderModel.Status == consts.OrderStatusRefunded { out = &orderModel return nil } + if orderModel.Status == consts.OrderStatusRefunding { + out = &orderModel + return nil + } if orderModel.Status != consts.OrderStatusPaid { return errorx.ErrStatusConflict.WithMsg("订单非已支付状态,无法退款") } @@ -767,38 +814,11 @@ func (s *order) AdminRefundOrder( } } - amount := orderModel.AmountPaid - refundKey := fmt.Sprintf("refund:%d", orderModel.ID) - - // 先退余额(账本入账),后更新订单状态与权益,确保退款可对账且可追溯。 - if amount > 0 { - if _, err := s.ledger.CreditRefundTx(ctx, tx, tenantID, operatorUserID, orderModel.UserID, orderModel.ID, amount, refundKey, reason, now); err != nil { - return err - } - } - - // 退款对权益:立即回收 content_access(revoked)。 - for _, item := range orderModel.Items { - if item == nil { - continue - } - if err := tx.Table(models.TableNameContentAccess). - Where("tenant_id = ? AND user_id = ? AND content_id = ?", tenantID, orderModel.UserID, item.ContentID). - Updates(map[string]any{ - "status": consts.ContentAccessStatusRevoked, - "revoked_at": now, - "updated_at": now, - }).Error; err != nil { - return err - } - } - - // 最后更新订单退款字段,保证退款后的最终状态一致。 + // 将订单推进到 refunding,并记录本次请求的审计字段;实际退款逻辑由异步 job 完成。 if err := tx.Table(models.TableNameOrder). Where("id = ?", orderModel.ID). Updates(map[string]any{ - "status": consts.OrderStatusRefunded, - "refunded_at": now, + "status": consts.OrderStatusRefunding, "refund_forced": force, "refund_operator_user_id": operatorUserID, "refund_reason": reason, @@ -807,8 +827,7 @@ func (s *order) AdminRefundOrder( return err } - orderModel.Status = consts.OrderStatusRefunded - orderModel.RefundedAt = now + orderModel.Status = consts.OrderStatusRefunding orderModel.RefundForced = force orderModel.RefundOperatorUserID = operatorUserID orderModel.RefundReason = reason @@ -828,6 +847,30 @@ func (s *order) AdminRefundOrder( return nil, err } + // refunding 状态需要确保异步任务已入队:入队失败则返回错误,调用方可重试(幂等)。 + if out != nil && out.Status == consts.OrderStatusRefunding { + err := s.enqueueOrderRefundJob(jobs_args.OrderRefundJob{ + TenantID: tenantID, + OrderID: out.ID, + OperatorUserID: operatorUserID, + Force: force, + Reason: reason, + }) + if err != nil { + logrus.WithFields(logrus.Fields{ + "tenant_id": tenantID, + "operator_user_id": operatorUserID, + "order_id": out.ID, + }).WithError(err).Warn("services.order.admin.refund.enqueue_failed") + return nil, err + } + logrus.WithFields(logrus.Fields{ + "tenant_id": tenantID, + "operator_user_id": operatorUserID, + "order_id": out.ID, + }).Info("services.order.admin.refund.enqueued") + } + logrus.WithFields(logrus.Fields{ "tenant_id": tenantID, "operator_user_id": operatorUserID, @@ -839,6 +882,131 @@ func (s *order) AdminRefundOrder( return out, nil } +// ProcessRefundingOrder 处理 refunding 订单:退余额、回收权益、推进到 refunded。 +// 供异步 job/worker 调用;需保持幂等。 +func (s *order) ProcessRefundingOrder(ctx context.Context, params *ProcessRefundingOrderParams) (*models.Order, error) { + if params == nil { + return nil, errorx.ErrInvalidParameter.WithMsg("params is required") + } + if params.TenantID <= 0 || params.OrderID <= 0 { + return nil, errorx.ErrInvalidParameter.WithMsg("tenant_id/order_id must be > 0") + } + if params.Now.IsZero() { + params.Now = time.Now() + } + + logrus.WithFields(logrus.Fields{ + "tenant_id": params.TenantID, + "order_id": params.OrderID, + "operator_user_id": params.OperatorUserID, + "force": params.Force, + }).Info("services.order.refund.process") + + var out *models.Order + err := s.db.WithContext(ctx).Transaction(func(tx *gorm.DB) error { + var orderModel models.Order + if err := tx. + Clauses(clause.Locking{Strength: "UPDATE"}). + Preload("Items"). + Where("tenant_id = ? AND id = ?", params.TenantID, params.OrderID). + First(&orderModel).Error; err != nil { + if errors.Is(err, gorm.ErrRecordNotFound) { + return errorx.ErrRecordNotFound.WithMsg("order not found") + } + return err + } + + // 幂等:已退款/已失败直接返回。 + if orderModel.Status == consts.OrderStatusRefunded || orderModel.Status == consts.OrderStatusFailed { + out = &orderModel + return nil + } + + // 仅允许 refunding 状态进入处理链路:paid->refunding 必须由接口层完成并记录审计字段。 + if orderModel.Status != consts.OrderStatusRefunding { + // 不可重试:状态不符合预期,直接标记 failed,避免 job 无限重试。 + _ = tx.Table(models.TableNameOrder). + Where("id = ?", orderModel.ID). + Updates(map[string]any{ + "status": consts.OrderStatusFailed, + "updated_at": params.Now, + }).Error + return errorx.ErrStatusConflict.WithMsg("order not in refunding status") + } + + // 补齐审计字段:以订单字段为准;若为空则用 job 参数兜底(避免历史数据/异常路径导致缺失)。 + operatorUserID := orderModel.RefundOperatorUserID + if operatorUserID == 0 { + operatorUserID = params.OperatorUserID + } + reason := orderModel.RefundReason + if strings.TrimSpace(reason) == "" { + reason = strings.TrimSpace(params.Reason) + } + force := orderModel.RefundForced + if !force { + force = params.Force + } + + amount := orderModel.AmountPaid + refundKey := fmt.Sprintf("refund:%d", orderModel.ID) + + // 先退余额(账本入账),后回收权益,最后推进订单终态:保证退款可对账且可追溯。 + if amount > 0 { + if _, err := s.ledger.CreditRefundTx(ctx, tx, params.TenantID, operatorUserID, orderModel.UserID, orderModel.ID, amount, refundKey, reason, params.Now); err != nil { + return err + } + } + + for _, item := range orderModel.Items { + if item == nil { + continue + } + if err := tx.Table(models.TableNameContentAccess). + Where("tenant_id = ? AND user_id = ? AND content_id = ?", params.TenantID, orderModel.UserID, item.ContentID). + Updates(map[string]any{ + "status": consts.ContentAccessStatusRevoked, + "revoked_at": params.Now, + "updated_at": params.Now, + }).Error; err != nil { + return err + } + } + + if err := tx.Table(models.TableNameOrder). + Where("id = ?", orderModel.ID). + Updates(map[string]any{ + "status": consts.OrderStatusRefunded, + "refunded_at": params.Now, + "refund_forced": force, + "refund_operator_user_id": operatorUserID, + "refund_reason": reason, + "updated_at": params.Now, + }).Error; err != nil { + return err + } + + orderModel.Status = consts.OrderStatusRefunded + orderModel.RefundedAt = params.Now + orderModel.RefundForced = force + orderModel.RefundOperatorUserID = operatorUserID + orderModel.RefundReason = reason + orderModel.UpdatedAt = params.Now + + out = &orderModel + return nil + }) + if err != nil { + // 不可重试错误由 worker 负责 JobCancel;这里保持原始 error 以便判定。 + logrus.WithFields(logrus.Fields{ + "tenant_id": params.TenantID, + "order_id": params.OrderID, + }).WithError(err).Warn("services.order.refund.process.failed") + return nil, err + } + return out, nil +} + func (s *order) PurchaseContent(ctx context.Context, params *PurchaseContentParams) (*PurchaseContentResult, error) { if params == nil { return nil, errorx.ErrInvalidParameter.WithMsg("params is required") diff --git a/backend/app/services/order_test.go b/backend/app/services/order_test.go index 5309cd5..5ce9278 100644 --- a/backend/app/services/order_test.go +++ b/backend/app/services/order_test.go @@ -1122,7 +1122,19 @@ func (s *OrderTestSuite) Test_AdminRefundOrder() { } So(access.Create(ctx), ShouldBeNil) - refunded, err := Order.AdminRefundOrder(ctx, tenantID, operatorUserID, orderModel.ID, false, "原因", "", now.Add(time.Minute)) + refunding, err := Order.AdminRefundOrder(ctx, tenantID, operatorUserID, orderModel.ID, false, "原因", "", now.Add(time.Minute)) + So(err, ShouldBeNil) + So(refunding, ShouldNotBeNil) + So(refunding.Status, ShouldEqual, consts.OrderStatusRefunding) + + refunded, err := Order.ProcessRefundingOrder(ctx, &ProcessRefundingOrderParams{ + TenantID: tenantID, + OrderID: orderModel.ID, + OperatorUserID: operatorUserID, + Force: false, + Reason: "原因", + Now: now.Add(2 * time.Minute), + }) So(err, ShouldBeNil) So(refunded, ShouldNotBeNil) So(refunded.Status, ShouldEqual, consts.OrderStatusRefunded) @@ -1136,7 +1148,7 @@ func (s *OrderTestSuite) Test_AdminRefundOrder() { So(access2.Status, ShouldEqual, consts.ContentAccessStatusRevoked) So(access2.RevokedAt.IsZero(), ShouldBeFalse) - refunded2, err := Order.AdminRefundOrder(ctx, tenantID, operatorUserID, orderModel.ID, false, "原因2", "", now.Add(2*time.Minute)) + refunded2, err := Order.AdminRefundOrder(ctx, tenantID, operatorUserID, orderModel.ID, false, "原因2", "", now.Add(3*time.Minute)) So(err, ShouldBeNil) So(refunded2.Status, ShouldEqual, consts.OrderStatusRefunded) diff --git a/backend/app/services/provider.gen.go b/backend/app/services/provider.gen.go index c018fe8..2f2214e 100755 --- a/backend/app/services/provider.gen.go +++ b/backend/app/services/provider.gen.go @@ -1,6 +1,7 @@ package services import ( + provider_job "quyun/v2/providers/job" provider_jwt "quyun/v2/providers/jwt" "go.ipao.vip/atom" @@ -49,10 +50,12 @@ func Provide(opts ...opt.Option) error { } if err := container.Container.Provide(func( db *gorm.DB, + job *provider_job.Job, ledger *ledger, ) (*order, error) { obj := &order{ db: db, + job: job, ledger: ledger, }