From d75fd7906f17287ec90c7917d71024d0626e670e Mon Sep 17 00:00:00 2001 From: yanghao05 Date: Mon, 14 Apr 2025 21:24:02 +0800 Subject: [PATCH] feat: add callback --- backend/app/http/pays.go | 13 +++++++- backend/app/jobs/wechat_callback.go | 49 +++++++++++++++++++++++++++++ 2 files changed, 61 insertions(+), 1 deletion(-) create mode 100644 backend/app/jobs/wechat_callback.go diff --git a/backend/app/http/pays.go b/backend/app/http/pays.go index 0a5db8f..79449e6 100644 --- a/backend/app/http/pays.go +++ b/backend/app/http/pays.go @@ -4,21 +4,27 @@ import ( "fmt" "net/http" + "quyun/app/jobs" + "quyun/providers/job" "quyun/providers/wepay" "github.com/go-pay/gopay" "github.com/go-pay/gopay/wechat/v3" "github.com/go-pay/util/js" "github.com/gofiber/fiber/v3" + log "github.com/sirupsen/logrus" ) type pays struct { wepay *wepay.Client + job *job.Job } // Callback // @Router /pay/callback [get] func (ctl *pays) Callback(ctx fiber.Ctx) error { + log := log.WithField("method", "pays.Callback") + body := ctx.Body() si := &wechat.SignInfo{ HeaderTimestamp: ctx.Get(wechat.HeaderTimestamp), @@ -29,6 +35,7 @@ func (ctl *pays) Callback(ctx fiber.Ctx) error { } notifyReq := &wechat.V3NotifyReq{SignInfo: si} if err := js.UnmarshalBytes(body, notifyReq); err != nil { + log.Errorf("json unmarshal error:%v", err) return ctx.Status(http.StatusBadRequest).JSON(fiber.Map{"error": fmt.Sprintf("json unmarshal error:%v", err)}) } @@ -38,10 +45,14 @@ func (ctl *pays) Callback(ctx fiber.Ctx) error { // 验证异步通知的签名 err := notifyReq.VerifySignByPKMap(certMap) if err != nil { + log.Errorf("verify sign error:%v", err) return ctx.Status(http.StatusBadRequest).JSON(fiber.Map{"error": "Invalid signature"}) } - // TODO: add process order job + if err := ctl.job.Add(&jobs.WechatCallback{NotifyReq: notifyReq}); err != nil { + log.Errorf("add job error:%v", err) + return ctx.Status(http.StatusInternalServerError).JSON(fiber.Map{"error": "Failed to add job"}) + } return ctx.Status(http.StatusOK).JSON(&wechat.V3NotifyRsp{ Code: gopay.SUCCESS, diff --git a/backend/app/jobs/wechat_callback.go b/backend/app/jobs/wechat_callback.go new file mode 100644 index 0000000..3b4f116 --- /dev/null +++ b/backend/app/jobs/wechat_callback.go @@ -0,0 +1,49 @@ +package jobs + +import ( + "context" + "time" + + "github.com/go-pay/gopay/wechat/v3" + . "github.com/riverqueue/river" + log "github.com/sirupsen/logrus" + _ "go.ipao.vip/atom" + "go.ipao.vip/atom/contracts" + _ "go.ipao.vip/atom/contracts" +) + +var _ contracts.JobArgs = (*WechatCallback)(nil) + +type WechatCallback struct { + NotifyReq *wechat.V3NotifyReq `json:"notify_req"` +} + +func (s WechatCallback) InsertOpts() InsertOpts { + return InsertOpts{ + Queue: QueueDefault, + Priority: PriorityDefault, + } +} + +func (WechatCallback) Kind() string { return "wechat_callback" } +func (a WechatCallback) UniqueID() string { return a.Kind() } + +var _ Worker[WechatCallback] = (*WechatCallbackWorker)(nil) + +// @provider(job) +type WechatCallbackWorker struct { + WorkerDefaults[WechatCallback] +} + +func (w *WechatCallbackWorker) Work(ctx context.Context, job *Job[WechatCallback]) error { + log := log.WithField("job", job.Args.Kind()) + + log.Infof("[Start] Working on job with strings: %+v", job.Args) + defer log.Infof("[End] Finished %s", job.Args.Kind()) + + return nil +} + +func (w *WechatCallbackWorker) NextRetry(job *Job[WechatCallback]) time.Time { + return time.Now().Add(30 * time.Second) +}