reset backend
This commit is contained in:
@@ -1,56 +0,0 @@
|
||||
//go:build legacytests
|
||||
// +build legacytests
|
||||
|
||||
package jobs
|
||||
|
||||
import (
|
||||
"context"
|
||||
"testing"
|
||||
|
||||
"quyun/v2/app/commands/testx"
|
||||
"quyun/v2/app/services"
|
||||
|
||||
. "github.com/riverqueue/river"
|
||||
. "github.com/smartystreets/goconvey/convey"
|
||||
"github.com/stretchr/testify/suite"
|
||||
_ "go.ipao.vip/atom"
|
||||
"go.ipao.vip/atom/contracts"
|
||||
"go.uber.org/dig"
|
||||
)
|
||||
|
||||
type DemoJobSuiteInjectParams struct {
|
||||
dig.In
|
||||
|
||||
Initials []contracts.Initial `group:"initials"` // nolint:structcheck
|
||||
}
|
||||
|
||||
type DemoJobSuite struct {
|
||||
suite.Suite
|
||||
|
||||
DemoJobSuiteInjectParams
|
||||
}
|
||||
|
||||
func Test_DemoJob(t *testing.T) {
|
||||
providers := testx.Default().With(Provide, services.Provide)
|
||||
|
||||
testx.Serve(providers, t, func(p DemoJobSuiteInjectParams) {
|
||||
suite.Run(t, &DemoJobSuite{DemoJobSuiteInjectParams: p})
|
||||
})
|
||||
}
|
||||
|
||||
func (t *DemoJobSuite) Test_Work() {
|
||||
Convey("test_work", t.T(), func() {
|
||||
Convey("step 1", func() {
|
||||
job := &Job[DemoJob]{
|
||||
Args: DemoJob{
|
||||
Strings: []string{"a", "b", "c"},
|
||||
},
|
||||
}
|
||||
|
||||
worker := &DemoJobWorker{}
|
||||
|
||||
err := worker.Work(context.Background(), job)
|
||||
So(err, ShouldBeNil)
|
||||
})
|
||||
})
|
||||
}
|
||||
@@ -1,58 +0,0 @@
|
||||
package jobs
|
||||
|
||||
import (
|
||||
"context"
|
||||
"time"
|
||||
|
||||
jobs_args "quyun/v2/app/jobs/args"
|
||||
"quyun/v2/app/services"
|
||||
|
||||
. "github.com/riverqueue/river"
|
||||
log "github.com/sirupsen/logrus"
|
||||
)
|
||||
|
||||
var _ Worker[jobs_args.MediaAssetProcessJob] = (*MediaAssetProcessJobWorker)(nil)
|
||||
|
||||
// MediaAssetProcessJobWorker 负责执行媒体资源处理的异步处理(占位实现)。
|
||||
//
|
||||
// 当前实现为 stub:不对接外部转码/截图服务,仅回写 meta 并将状态置为 ready。
|
||||
//
|
||||
// @provider(job)
|
||||
type MediaAssetProcessJobWorker struct {
|
||||
WorkerDefaults[jobs_args.MediaAssetProcessJob]
|
||||
}
|
||||
|
||||
func (w *MediaAssetProcessJobWorker) Work(ctx context.Context, job *Job[jobs_args.MediaAssetProcessJob]) error {
|
||||
args := job.Args
|
||||
|
||||
attempt := 0
|
||||
if job != nil && job.JobRow != nil {
|
||||
attempt = job.Attempt
|
||||
}
|
||||
|
||||
logger := log.WithFields(log.Fields{
|
||||
"job_kind": args.Kind(),
|
||||
"tenant_id": args.TenantID,
|
||||
"asset_id": args.AssetID,
|
||||
"attempt": attempt,
|
||||
})
|
||||
|
||||
logger.Info("jobs.media_asset_process.start")
|
||||
|
||||
_, err := services.MediaAsset.ProcessSuccess(ctx, args.TenantID, args.AssetID, map[string]any{
|
||||
"process_pipeline": "stub",
|
||||
"worker_attempt": attempt,
|
||||
"worker_ran_at": time.Now().UTC().Format(time.RFC3339Nano),
|
||||
}, time.Now().UTC())
|
||||
if err != nil {
|
||||
if services.IsMediaAssetProcessJobNonRetryableError(err) {
|
||||
logger.WithError(err).Warn("jobs.media_asset_process.cancel")
|
||||
return JobCancel(err)
|
||||
}
|
||||
logger.WithError(err).Warn("jobs.media_asset_process.retry")
|
||||
return err
|
||||
}
|
||||
|
||||
logger.Info("jobs.media_asset_process.ok")
|
||||
return nil
|
||||
}
|
||||
@@ -1,85 +0,0 @@
|
||||
package jobs
|
||||
|
||||
import (
|
||||
"database/sql"
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
"quyun/v2/app/commands/testx"
|
||||
jobs_args "quyun/v2/app/jobs/args"
|
||||
"quyun/v2/app/services"
|
||||
"quyun/v2/database"
|
||||
"quyun/v2/database/models"
|
||||
"quyun/v2/pkg/consts"
|
||||
|
||||
"github.com/riverqueue/river"
|
||||
"github.com/riverqueue/river/rivertype"
|
||||
. "github.com/smartystreets/goconvey/convey"
|
||||
"github.com/stretchr/testify/suite"
|
||||
|
||||
_ "go.ipao.vip/atom"
|
||||
"go.ipao.vip/atom/contracts"
|
||||
"go.ipao.vip/gen/types"
|
||||
"go.uber.org/dig"
|
||||
)
|
||||
|
||||
type MediaAssetProcessJobSuiteInjectParams struct {
|
||||
dig.In
|
||||
|
||||
DB *sql.DB
|
||||
Initials []contracts.Initial `group:"initials"` // nolint:structcheck
|
||||
}
|
||||
|
||||
type MediaAssetProcessJobSuite struct {
|
||||
suite.Suite
|
||||
MediaAssetProcessJobSuiteInjectParams
|
||||
}
|
||||
|
||||
func Test_MediaAssetProcessJob(t *testing.T) {
|
||||
// 注意:testx.Default() 已内置一个测试用的 job worker 注册器,避免和 jobs.Provide 重复注册同 kind worker。
|
||||
providers := testx.Default().With(services.Provide)
|
||||
|
||||
testx.Serve(providers, t, func(p MediaAssetProcessJobSuiteInjectParams) {
|
||||
suite.Run(t, &MediaAssetProcessJobSuite{MediaAssetProcessJobSuiteInjectParams: p})
|
||||
})
|
||||
}
|
||||
|
||||
func (s *MediaAssetProcessJobSuite) Test_Work_ProcessingToReady() {
|
||||
Convey("MediaAssetProcessJobWorker processing -> ready", s.T(), func() {
|
||||
ctx := s.T().Context()
|
||||
now := time.Now().UTC()
|
||||
tenantID := int64(1)
|
||||
userID := int64(2)
|
||||
|
||||
database.Truncate(ctx, s.DB, models.TableNameMediaAsset)
|
||||
|
||||
asset := &models.MediaAsset{
|
||||
TenantID: tenantID,
|
||||
UserID: userID,
|
||||
Type: consts.MediaAssetTypeVideo,
|
||||
Status: consts.MediaAssetStatusProcessing,
|
||||
Provider: "test",
|
||||
Bucket: "b",
|
||||
ObjectKey: "k",
|
||||
Meta: types.JSON([]byte("{}")),
|
||||
CreatedAt: now,
|
||||
UpdatedAt: now,
|
||||
}
|
||||
So(asset.Create(ctx), ShouldBeNil)
|
||||
|
||||
worker := &MediaAssetProcessJobWorker{}
|
||||
err := worker.Work(ctx, &river.Job[jobs_args.MediaAssetProcessJob]{
|
||||
JobRow: &rivertype.JobRow{Attempt: 1},
|
||||
Args: jobs_args.MediaAssetProcessJob{TenantID: tenantID, AssetID: asset.ID},
|
||||
})
|
||||
So(err, ShouldBeNil)
|
||||
|
||||
tbl, query := models.MediaAssetQuery.QueryContext(ctx)
|
||||
got, err := query.Where(
|
||||
tbl.TenantID.Eq(tenantID),
|
||||
tbl.ID.Eq(asset.ID),
|
||||
).First()
|
||||
So(err, ShouldBeNil)
|
||||
So(got.Status, ShouldEqual, consts.MediaAssetStatusReady)
|
||||
})
|
||||
}
|
||||
@@ -1,62 +0,0 @@
|
||||
package jobs
|
||||
|
||||
import (
|
||||
"context"
|
||||
"time"
|
||||
|
||||
jobs_args "quyun/v2/app/jobs/args"
|
||||
"quyun/v2/app/services"
|
||||
|
||||
. "github.com/riverqueue/river"
|
||||
log "github.com/sirupsen/logrus"
|
||||
)
|
||||
|
||||
var _ Worker[jobs_args.OrderRefundJob] = (*OrderRefundJobWorker)(nil)
|
||||
|
||||
// OrderRefundJobWorker 负责执行订单退款的异步处理。
|
||||
//
|
||||
// @provider(job)
|
||||
type OrderRefundJobWorker struct {
|
||||
WorkerDefaults[jobs_args.OrderRefundJob]
|
||||
}
|
||||
|
||||
func (w *OrderRefundJobWorker) Work(ctx context.Context, job *Job[jobs_args.OrderRefundJob]) error {
|
||||
args := job.Args
|
||||
|
||||
logger := log.WithFields(log.Fields{
|
||||
"job_kind": args.Kind(),
|
||||
"tenant_id": args.TenantID,
|
||||
"order_id": args.OrderID,
|
||||
"operator_user_id": args.OperatorUserID,
|
||||
"force": args.Force,
|
||||
"attempt": job.Attempt,
|
||||
})
|
||||
|
||||
// 只允许在异步 worker 层做执行,不要在这里再次入队其他任务(避免耦合与递归依赖)。
|
||||
logger.Info("jobs.order_refund.start")
|
||||
|
||||
_, err := services.Order.ProcessRefundingOrder(ctx, &services.ProcessRefundingOrderParams{
|
||||
TenantID: args.TenantID,
|
||||
OrderID: args.OrderID,
|
||||
OperatorUserID: args.OperatorUserID,
|
||||
Force: args.Force,
|
||||
Reason: args.Reason,
|
||||
Now: time.Now().UTC(),
|
||||
})
|
||||
if err != nil {
|
||||
// 业务层会返回可识别的“不可重试”错误:由它内部完成状态落库(failed)后,这里直接 cancel。
|
||||
if services.IsRefundJobNonRetryableError(err) {
|
||||
// best-effort:将订单标记为 failed,便于管理员重新发起退款(paid/failed -> refunding)。
|
||||
if markErr := services.Order.MarkRefundFailed(ctx, args.TenantID, args.OrderID, time.Now().UTC()); markErr != nil {
|
||||
logger.WithError(markErr).Warn("jobs.order_refund.mark_failed_failed")
|
||||
}
|
||||
logger.WithError(err).Warn("jobs.order_refund.cancel")
|
||||
return JobCancel(err)
|
||||
}
|
||||
logger.WithError(err).Warn("jobs.order_refund.retry")
|
||||
return err
|
||||
}
|
||||
|
||||
logger.Info("jobs.order_refund.ok")
|
||||
return nil
|
||||
}
|
||||
@@ -1,39 +1,9 @@
|
||||
package jobs
|
||||
|
||||
import (
|
||||
"quyun/v2/providers/job"
|
||||
|
||||
"github.com/riverqueue/river"
|
||||
"go.ipao.vip/atom"
|
||||
"go.ipao.vip/atom/container"
|
||||
"go.ipao.vip/atom/contracts"
|
||||
"go.ipao.vip/atom/opt"
|
||||
)
|
||||
|
||||
func Provide(opts ...opt.Option) error {
|
||||
if err := container.Container.Provide(func(
|
||||
__job *job.Job,
|
||||
) (contracts.Initial, error) {
|
||||
obj := &MediaAssetProcessJobWorker{}
|
||||
if err := river.AddWorkerSafely(__job.Workers, obj); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
return obj, nil
|
||||
}, atom.GroupInitial); err != nil {
|
||||
return err
|
||||
}
|
||||
if err := container.Container.Provide(func(
|
||||
__job *job.Job,
|
||||
) (contracts.Initial, error) {
|
||||
obj := &OrderRefundJobWorker{}
|
||||
if err := river.AddWorkerSafely(__job.Workers, obj); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
return obj, nil
|
||||
}, atom.GroupInitial); err != nil {
|
||||
return err
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user