diff --git a/backend/app/events/publishers/user_register.go b/backend/app/events/publishers/user_register.go index bc5a285..06664ce 100644 --- a/backend/app/events/publishers/user_register.go +++ b/backend/app/events/publishers/user_register.go @@ -2,9 +2,9 @@ package publishers import ( "encoding/json" + "quyun/app/events" "go.ipao.vip/atom/contracts" - "quyun/app/events" ) var _ contracts.EventPublisher = (*UserRegister)(nil) diff --git a/backend/app/events/subscribers/user_register.go b/backend/app/events/subscribers/user_register.go index 8a41a51..53de1b1 100644 --- a/backend/app/events/subscribers/user_register.go +++ b/backend/app/events/subscribers/user_register.go @@ -2,11 +2,11 @@ package subscribers import ( "encoding/json" - - "go.ipao.vip/atom/contracts" "quyun/app/events" "quyun/app/events/publishers" + "go.ipao.vip/atom/contracts" + "github.com/ThreeDotsLabs/watermill/message" "github.com/sirupsen/logrus" ) diff --git a/backend/app/http/admin/uploads.go b/backend/app/http/admin/uploads.go index 5d91720..bd9e720 100644 --- a/backend/app/http/admin/uploads.go +++ b/backend/app/http/admin/uploads.go @@ -4,7 +4,6 @@ import ( "errors" "fmt" "path/filepath" - "quyun/app/jobs" "quyun/app/models" "quyun/database/schemas/public/model" diff --git a/backend/app/http/auth.go b/backend/app/http/auth.go index e60b3af..1f286a3 100644 --- a/backend/app/http/auth.go +++ b/backend/app/http/auth.go @@ -2,14 +2,13 @@ package http import ( "net/url" - "time" - "quyun/app/models" "quyun/database/fields" "quyun/database/schemas/public/model" "quyun/pkg/utils" "quyun/providers/jwt" "quyun/providers/wechat" + "time" "github.com/gofiber/fiber/v3" "github.com/pkg/errors" diff --git a/backend/app/http/pays.go b/backend/app/http/pays.go index b0c5666..3eb3468 100644 --- a/backend/app/http/pays.go +++ b/backend/app/http/pays.go @@ -1,16 +1,13 @@ package http import ( - "fmt" "net/http" - "quyun/app/jobs" "quyun/providers/job" "quyun/providers/wepay" "github.com/go-pay/gopay" "github.com/go-pay/gopay/wechat/v3" - "github.com/go-pay/util/js" "github.com/gofiber/fiber/v3" log "github.com/sirupsen/logrus" ) @@ -28,31 +25,13 @@ type pays struct { func (ctl *pays) Callback(ctx fiber.Ctx, channel string) error { log := log.WithField("method", "pays.Callback") - body := ctx.Body() - si := &wechat.SignInfo{ - HeaderTimestamp: ctx.Get(wechat.HeaderTimestamp), - HeaderNonce: ctx.Get(wechat.HeaderNonce), - HeaderSignature: ctx.Get(wechat.HeaderSignature), - HeaderSerial: ctx.Get(wechat.HeaderSerial), - SignBody: string(body), - } - notifyReq := &wechat.V3NotifyReq{SignInfo: si} - if err := js.UnmarshalBytes(body, notifyReq); err != nil { - log.Errorf("json unmarshal error:%v", err) - return ctx.Status(http.StatusBadRequest).JSON(fiber.Map{"error": fmt.Sprintf("json unmarshal error:%v", err)}) - } - - // 获取微信平台证书 - certMap := ctl.wepay.WxPublicKeyMap() - - // 验证异步通知的签名 - err := notifyReq.VerifySignByPKMap(certMap) + notify, err := ctl.wepay.ParseNotify(ctx) if err != nil { - log.Errorf("verify sign error:%v", err) - return ctx.Status(http.StatusBadRequest).JSON(fiber.Map{"error": "Invalid signature"}) + log.Errorf("ParseNotify error:%v", err) + return ctx.Status(http.StatusBadRequest).JSON(fiber.Map{"error": "Failed to parse notify"}) } - if err := ctl.job.Add(&jobs.WechatCallback{NotifyReq: notifyReq}); err != nil { + if err := ctl.job.Add(&jobs.WechatPayNotify{PayNotify: notify}); err != nil { log.Errorf("add job error:%v", err) return ctx.Status(http.StatusInternalServerError).JSON(fiber.Map{"error": "Failed to add job"}) } diff --git a/backend/app/http/posts.go b/backend/app/http/posts.go index 6a5d1be..0520ac7 100644 --- a/backend/app/http/posts.go +++ b/backend/app/http/posts.go @@ -2,14 +2,13 @@ package http import ( _ "embed" - "time" - "quyun/app/models" "quyun/app/requests" "quyun/database/fields" "quyun/database/schemas/public/model" "quyun/providers/ali" "quyun/providers/wepay" + "time" "github.com/go-pay/gopay/wechat/v3" "github.com/gofiber/fiber/v3" diff --git a/backend/app/http/users.go b/backend/app/http/users.go index b801e0a..c33f173 100644 --- a/backend/app/http/users.go +++ b/backend/app/http/users.go @@ -1,11 +1,10 @@ package http import ( - "strings" - "time" - "quyun/app/models" "quyun/database/schemas/public/model" + "strings" + "time" "github.com/gofiber/fiber/v3" ) diff --git a/backend/app/http/wechats.go b/backend/app/http/wechats.go index b1e3e43..c28e350 100644 --- a/backend/app/http/wechats.go +++ b/backend/app/http/wechats.go @@ -1,11 +1,10 @@ package http import ( - "time" - "quyun/app/models" "quyun/database/schemas/public/model" "quyun/providers/wechat" + "time" "github.com/gofiber/fiber/v3" "github.com/gofiber/fiber/v3/log" diff --git a/backend/app/jobs/download_from_alioss.go b/backend/app/jobs/download_from_alioss.go index d6d33b6..9481ce9 100644 --- a/backend/app/jobs/download_from_alioss.go +++ b/backend/app/jobs/download_from_alioss.go @@ -4,12 +4,11 @@ import ( "context" "os" "path/filepath" - "time" - "quyun/app/models" "quyun/providers/ali" "quyun/providers/app" "quyun/providers/job" + "time" . "github.com/riverqueue/river" log "github.com/sirupsen/logrus" @@ -18,7 +17,7 @@ import ( _ "go.ipao.vip/atom/contracts" ) -var _ contracts.JobArgs = (*WechatCallback)(nil) +var _ contracts.JobArgs = (*DownloadFromAliOSS)(nil) type DownloadFromAliOSS struct { MediaHash string `json:"media_hash"` diff --git a/backend/app/jobs/download_from_alioss_test.go b/backend/app/jobs/download_from_alioss_test.go index 2c2d2e3..ce7f211 100644 --- a/backend/app/jobs/download_from_alioss_test.go +++ b/backend/app/jobs/download_from_alioss_test.go @@ -2,13 +2,12 @@ package jobs import ( "context" - "testing" - "quyun/app/models" "quyun/app/service/testx" "quyun/providers/ali" "quyun/providers/app" "quyun/providers/job" + "testing" . "github.com/riverqueue/river" . "github.com/smartystreets/goconvey/convey" diff --git a/backend/app/jobs/provider.gen.go b/backend/app/jobs/provider.gen.go index 0703620..95e4365 100755 --- a/backend/app/jobs/provider.gen.go +++ b/backend/app/jobs/provider.gen.go @@ -147,7 +147,7 @@ func Provide(opts ...opt.Option) error { if err := container.Container.Provide(func( __job *job.Job, ) (contracts.Initial, error) { - obj := &WechatCallbackWorker{} + obj := &WechatPayNotifyWorker{} if err := river.AddWorkerSafely(__job.Workers, obj); err != nil { return nil, err } diff --git a/backend/app/jobs/publish_draft_posts.go b/backend/app/jobs/publish_draft_posts.go index ca36a17..90dab4f 100644 --- a/backend/app/jobs/publish_draft_posts.go +++ b/backend/app/jobs/publish_draft_posts.go @@ -2,8 +2,6 @@ package jobs import ( "context" - "time" - "quyun/app/models" "quyun/database/fields" "quyun/database/schemas/public/model" @@ -11,6 +9,7 @@ import ( "quyun/providers/ali" "quyun/providers/app" "quyun/providers/job" + "time" . "github.com/riverqueue/river" "github.com/samber/lo" diff --git a/backend/app/jobs/video_cut.go b/backend/app/jobs/video_cut.go index bca980b..ff51707 100644 --- a/backend/app/jobs/video_cut.go +++ b/backend/app/jobs/video_cut.go @@ -3,13 +3,12 @@ package jobs import ( "context" "path/filepath" - "time" - "quyun/app/models" "quyun/database/fields" "quyun/pkg/utils" "quyun/providers/app" "quyun/providers/job" + "time" . "github.com/riverqueue/river" log "github.com/sirupsen/logrus" diff --git a/backend/app/jobs/video_extract_head_image.go b/backend/app/jobs/video_extract_head_image.go index 10785d7..8b8dcdc 100644 --- a/backend/app/jobs/video_extract_head_image.go +++ b/backend/app/jobs/video_extract_head_image.go @@ -4,8 +4,6 @@ import ( "context" "os" "path/filepath" - "time" - "quyun/app/models" "quyun/database/fields" "quyun/database/schemas/public/model" @@ -13,6 +11,7 @@ import ( "quyun/providers/ali" "quyun/providers/app" "quyun/providers/job" + "time" . "github.com/riverqueue/river" log "github.com/sirupsen/logrus" diff --git a/backend/app/jobs/video_store_short.go b/backend/app/jobs/video_store_short.go index 0e64f21..7efb02c 100644 --- a/backend/app/jobs/video_store_short.go +++ b/backend/app/jobs/video_store_short.go @@ -3,8 +3,6 @@ package jobs import ( "context" "path/filepath" - "time" - "quyun/app/models" "quyun/database/fields" "quyun/database/schemas/public/model" @@ -12,6 +10,7 @@ import ( "quyun/providers/ali" "quyun/providers/app" "quyun/providers/job" + "time" . "github.com/riverqueue/river" log "github.com/sirupsen/logrus" diff --git a/backend/app/jobs/wechat_callback.go b/backend/app/jobs/wechat_callback.go deleted file mode 100644 index 3b4f116..0000000 --- a/backend/app/jobs/wechat_callback.go +++ /dev/null @@ -1,49 +0,0 @@ -package jobs - -import ( - "context" - "time" - - "github.com/go-pay/gopay/wechat/v3" - . "github.com/riverqueue/river" - log "github.com/sirupsen/logrus" - _ "go.ipao.vip/atom" - "go.ipao.vip/atom/contracts" - _ "go.ipao.vip/atom/contracts" -) - -var _ contracts.JobArgs = (*WechatCallback)(nil) - -type WechatCallback struct { - NotifyReq *wechat.V3NotifyReq `json:"notify_req"` -} - -func (s WechatCallback) InsertOpts() InsertOpts { - return InsertOpts{ - Queue: QueueDefault, - Priority: PriorityDefault, - } -} - -func (WechatCallback) Kind() string { return "wechat_callback" } -func (a WechatCallback) UniqueID() string { return a.Kind() } - -var _ Worker[WechatCallback] = (*WechatCallbackWorker)(nil) - -// @provider(job) -type WechatCallbackWorker struct { - WorkerDefaults[WechatCallback] -} - -func (w *WechatCallbackWorker) Work(ctx context.Context, job *Job[WechatCallback]) error { - log := log.WithField("job", job.Args.Kind()) - - log.Infof("[Start] Working on job with strings: %+v", job.Args) - defer log.Infof("[End] Finished %s", job.Args.Kind()) - - return nil -} - -func (w *WechatCallbackWorker) NextRetry(job *Job[WechatCallback]) time.Time { - return time.Now().Add(30 * time.Second) -} diff --git a/backend/app/jobs/wechat_pay_notify.go b/backend/app/jobs/wechat_pay_notify.go new file mode 100644 index 0000000..4823d11 --- /dev/null +++ b/backend/app/jobs/wechat_pay_notify.go @@ -0,0 +1,108 @@ +package jobs + +import ( + "context" + "fmt" + "quyun/app/models" + "quyun/database/fields" + "quyun/providers/wepay" + "time" + + "github.com/pkg/errors" + . "github.com/riverqueue/river" + log "github.com/sirupsen/logrus" + _ "go.ipao.vip/atom" + "go.ipao.vip/atom/contracts" + _ "go.ipao.vip/atom/contracts" +) + +var _ contracts.JobArgs = (*WechatPayNotify)(nil) + +type WechatPayNotify struct { + PayNotify *wepay.PayNotify `json:"notify_req"` +} + +func (s WechatPayNotify) InsertOpts() InsertOpts { + return InsertOpts{ + Queue: QueueDefault, + Priority: PriorityDefault, + } +} + +func (WechatPayNotify) Kind() string { return "wechat_callback" } +func (a WechatPayNotify) UniqueID() string { return a.Kind() } + +var _ Worker[WechatPayNotify] = (*WechatPayNotifyWorker)(nil) + +// @provider(job) +type WechatPayNotifyWorker struct { + WorkerDefaults[WechatPayNotify] +} + +func (w *WechatPayNotifyWorker) Work(ctx context.Context, job *Job[WechatPayNotify]) error { + log := log.WithField("job", job.Args.Kind()) + + log.Infof("[Start] Working on job with strings: %+v", job.Args) + defer log.Infof("[End] Finished %s", job.Args.Kind()) + + notify := job.Args.PayNotify + + if notify.TradeState != "SUCCESS" { + log.Warnf("TradeState is not SUCCESS for order %s", notify.OutTradeNo) + return JobCancel(errors.New("TradeState is not SUCCESS")) + } + + order, err := models.Orders.GetByOrderNo(context.Background(), notify.OutTradeNo) + if err != nil { + log.Errorf("GetByOrderNo error:%v", err) + return err + } + + if order.Status != fields.OrderStatusPending { + log.Infof("Order %s is paid, processing...", job.Args.PayNotify.OutTradeNo) + return JobCancel(fmt.Errorf("Order already paid, currently status: %d", order.Status)) + } + + needToPay := order.Price * int64(order.Discount) / 100 + + if notify.Amount.Total != needToPay { + log.Errorf("Order %s amount mismatch: expected %d, got %d", job.Args.PayNotify.OutTradeNo, needToPay, notify.Amount.Total) + return fmt.Errorf("amount mismatch for order %s", job.Args.PayNotify.OutTradeNo) + } + + order.TransactionID = notify.TransactionID + order.Currency = notify.Amount.Currency + order.PaymentMethod = notify.TradeType + order.Status = fields.OrderStatusCompleted + order.Meta = fields.ToJson(fields.OrderMeta{ + PayNotify: *notify, + }) + + log.Infof("Updated order details: %+v", order) + tx, err := models.Transaction(ctx) + if err != nil { + return errors.Wrap(err, "Transaction error") + } + defer tx.Rollback() + + if err := models.Users.BuyPosts(context.Background(), order.UserID, order.PostID, order.Price); err != nil { + log.Errorf("BuyPosts error:%v", err) + return errors.Wrap(err, "BuyPosts error") + } + + if err := models.Orders.Update(context.Background(), order); err != nil { + log.Errorf("Update order error:%v", err) + return errors.Wrap(err, "Update order error") + } + if err := tx.Commit(); err != nil { + log.Errorf("Commit error:%v", err) + return errors.Wrap(err, "Commit error") + } + + log.Infof("Successfully processed order %s", notify.OutTradeNo) + return nil +} + +func (w *WechatPayNotifyWorker) NextRetry(job *Job[WechatPayNotify]) time.Time { + return time.Now().Add(30 * time.Second) +} diff --git a/backend/app/jobs/wechat_pay_notify_test.go b/backend/app/jobs/wechat_pay_notify_test.go new file mode 100644 index 0000000..a9b7738 --- /dev/null +++ b/backend/app/jobs/wechat_pay_notify_test.go @@ -0,0 +1,86 @@ +package jobs + +import ( + "context" + "encoding/json" + "quyun/app/models" + "quyun/app/service/testx" + "quyun/providers/ali" + "quyun/providers/app" + "quyun/providers/job" + "quyun/providers/wepay" + "testing" + + . "github.com/riverqueue/river" + . "github.com/smartystreets/goconvey/convey" + "github.com/stretchr/testify/suite" + _ "go.ipao.vip/atom" + "go.ipao.vip/atom/contracts" + "go.uber.org/dig" +) + +type WechatPayNotifySuiteInjectParams struct { + dig.In + + Initials []contracts.Initial `group:"initials"` // nolint:structcheck + Job *job.Job + Oss *ali.OSSClient + App *app.Config +} + +type WechatPayNotifySuite struct { + suite.Suite + + WechatPayNotifySuiteInjectParams +} + +func Test_WechatPayNotify(t *testing.T) { + providers := testx.Default().With(Provide, models.Provide) + + testx.Serve(providers, t, func(p WechatPayNotifySuiteInjectParams) { + suite.Run(t, &WechatPayNotifySuite{WechatPayNotifySuiteInjectParams: p}) + }) +} + +func (t *WechatPayNotifySuite) Test_Work() { + Convey("test_work", t.T(), func() { + Convey("step 1", func() { + notify := `{ + "mchid": "1702644947", + "appid": "wx47649361b6eba174", + "out_trade_no": "20250430192543", + "transaction_id": "4200002602202504300651871941", + "trade_type": "JSAPI", + "trade_state": "SUCCESS", + "trade_state_desc": "支付成功", + "bank_type": "OTHERS", + "attach": "", + "success_time": "2025-04-30T19:25:51+08:00", + "payer": { + "openid": "o5Bzk644x3LOMJsKSZRlqWin74IU" + }, + "amount": { + "total": 1, + "payer_total": 1, + "currency": "CNY", + "payer_currency": "CNY" + } +}` + + var payNotify wepay.PayNotify + err := json.Unmarshal([]byte(notify), &payNotify) + So(err, ShouldBeNil) + + job := &Job[WechatPayNotify]{ + Args: WechatPayNotify{ + PayNotify: &payNotify, + }, + } + + worker := &WechatPayNotifyWorker{} + + err = worker.Work(context.Background(), job) + So(err, ShouldBeNil) + }) + }) +} diff --git a/backend/app/middlewares/mid_auth.go b/backend/app/middlewares/mid_auth.go index 5744fac..109f36b 100644 --- a/backend/app/middlewares/mid_auth.go +++ b/backend/app/middlewares/mid_auth.go @@ -2,11 +2,10 @@ package middlewares import ( "net/url" - "strings" - "time" - "quyun/app/models" "quyun/pkg/utils" + "strings" + "time" "github.com/gofiber/fiber/v3" log "github.com/sirupsen/logrus" diff --git a/backend/app/models/medias.go b/backend/app/models/medias.go index 2eff3cc..e814557 100644 --- a/backend/app/models/medias.go +++ b/backend/app/models/medias.go @@ -2,12 +2,11 @@ package models import ( "context" - "time" - "quyun/app/requests" "quyun/database/fields" "quyun/database/schemas/public/model" "quyun/database/schemas/public/table" + "time" . "github.com/go-jet/jet/v2/postgres" "github.com/samber/lo" diff --git a/backend/app/models/medias_test.go b/backend/app/models/medias_test.go index 38432e9..6564ee8 100644 --- a/backend/app/models/medias_test.go +++ b/backend/app/models/medias_test.go @@ -5,14 +5,13 @@ import ( "fmt" "math" "math/rand" - "testing" - "time" - "quyun/app/requests" "quyun/app/service/testx" "quyun/database" "quyun/database/schemas/public/model" "quyun/database/schemas/public/table" + "testing" + "time" . "github.com/smartystreets/goconvey/convey" "go.ipao.vip/atom/contracts" diff --git a/backend/app/models/orders.go b/backend/app/models/orders.go index 2e2bbb5..a23a203 100644 --- a/backend/app/models/orders.go +++ b/backend/app/models/orders.go @@ -3,12 +3,11 @@ package models import ( "context" "fmt" - "time" - "quyun/app/requests" "quyun/database/fields" "quyun/database/schemas/public/model" "quyun/database/schemas/public/table" + "time" . "github.com/go-jet/jet/v2/postgres" "github.com/pkg/errors" @@ -243,3 +242,68 @@ func (m *ordersModel) SumAmount(ctx context.Context) (int64, error) { return cnt.Cnt, nil } + +// GetByOrderNo +func (m *ordersModel) GetByOrderNo(ctx context.Context, orderNo string) (*model.Orders, error) { + tbl := table.Orders + stmt := tbl. + SELECT(tbl.AllColumns). + WHERE( + tbl.OrderNo.EQ(String(orderNo)), + ) + m.log.Infof("sql: %s", stmt.DebugSql()) + + var order model.Orders + err := stmt.QueryContext(ctx, db, &order) + if err != nil { + m.log.Errorf("error querying order by orderNo: %v", err) + return nil, err + } + + return &order, nil +} + +// SetTranscationID +func (m *ordersModel) SetTranscationID(ctx context.Context, id int64, transactionID string) error { + tbl := table.Orders + stmt := tbl. + UPDATE(tbl.TransactionID). + SET(String(transactionID)). + WHERE( + tbl.ID.EQ(Int64(id)), + ) + m.log.Infof("sql: %s", stmt.DebugSql()) + + if _, err := stmt.ExecContext(ctx, db); err != nil { + m.log.Errorf("error set order transaction ID: %v", err) + return err + } + return nil +} + +// Update +func (m *ordersModel) Update(ctx context.Context, order *model.Orders) error { + tbl := table.Orders + stmt := tbl. + UPDATE( + tbl.MutableColumns.Except( + tbl.OrderNo, + tbl.Price, + tbl.Discount, + tbl.SubOrderNo, + tbl.PostID, + tbl.UserID, + ), + ). + MODEL(order). + WHERE( + tbl.ID.EQ(Int64(order.ID)), + ) + m.log.Infof("sql: %s", stmt.DebugSql()) + + if _, err := stmt.ExecContext(ctx, db); err != nil { + m.log.Errorf("error updating order: %v", err) + return err + } + return nil +} diff --git a/backend/app/models/orders_test.go b/backend/app/models/orders_test.go index 853d373..ae82758 100644 --- a/backend/app/models/orders_test.go +++ b/backend/app/models/orders_test.go @@ -2,11 +2,10 @@ package models import ( "context" - "testing" - "quyun/app/service/testx" "quyun/database" "quyun/database/schemas/public/table" + "testing" . "github.com/smartystreets/goconvey/convey" "go.ipao.vip/atom/contracts" diff --git a/backend/app/models/posts.go b/backend/app/models/posts.go index c676e91..9371d9b 100644 --- a/backend/app/models/posts.go +++ b/backend/app/models/posts.go @@ -3,11 +3,10 @@ package models import ( "context" "errors" - "time" - "quyun/app/requests" "quyun/database/schemas/public/model" "quyun/database/schemas/public/table" + "time" . "github.com/go-jet/jet/v2/postgres" "github.com/go-jet/jet/v2/qrm" diff --git a/backend/app/models/posts_test.go b/backend/app/models/posts_test.go index c5ea30d..46d49bc 100644 --- a/backend/app/models/posts_test.go +++ b/backend/app/models/posts_test.go @@ -4,13 +4,12 @@ import ( "context" "fmt" "math/rand" - "testing" - "quyun/app/service/testx" "quyun/database" "quyun/database/fields" "quyun/database/schemas/public/model" "quyun/database/schemas/public/table" + "testing" . "github.com/smartystreets/goconvey/convey" "go.ipao.vip/atom/contracts" diff --git a/backend/app/models/users.go b/backend/app/models/users.go index 149feb7..cff9da0 100644 --- a/backend/app/models/users.go +++ b/backend/app/models/users.go @@ -2,12 +2,12 @@ package models import ( "context" - "time" - + "database/sql" "quyun/app/requests" "quyun/database/fields" "quyun/database/schemas/public/model" "quyun/database/schemas/public/table" + "time" . "github.com/go-jet/jet/v2/postgres" "github.com/go-jet/jet/v2/qrm" @@ -16,6 +16,10 @@ import ( "github.com/sirupsen/logrus" ) +func Transaction(ctx context.Context) (*sql.Tx, error) { + return db.Begin() +} + // @provider type usersModel struct { log *logrus.Entry `inject:"false"` @@ -404,3 +408,25 @@ func (m *usersModel) UpdateUserToken(ctx context.Context, id int64, token fields } return nil } + +// BuyPosts +func (m *usersModel) BuyPosts(ctx context.Context, userID, postID, price int64) error { + tbl := table.UserPosts + stmt := tbl. + INSERT(tbl.MutableColumns). + MODEL(&model.UserPosts{ + UserID: userID, + PostID: postID, + Price: price, + CreatedAt: time.Now(), + UpdatedAt: time.Now(), + }) + + m.log.Infof("sql: %s", stmt.DebugSql()) + + if _, err := stmt.ExecContext(ctx, db); err != nil { + m.log.Errorf("error inserting user post: %v", err) + return err + } + return nil +} diff --git a/backend/app/models/users_test.go b/backend/app/models/users_test.go index 3cf4c57..168d5c5 100644 --- a/backend/app/models/users_test.go +++ b/backend/app/models/users_test.go @@ -2,13 +2,12 @@ package models import ( "context" - "testing" - "quyun/app/service/testx" "quyun/database" "quyun/database/fields" "quyun/database/schemas/public/model" "quyun/database/schemas/public/table" + "testing" "github.com/samber/lo" . "github.com/smartystreets/goconvey/convey" diff --git a/backend/app/service/commands/compress.go b/backend/app/service/commands/compress.go index ad29c65..b95d76d 100644 --- a/backend/app/service/commands/compress.go +++ b/backend/app/service/commands/compress.go @@ -9,12 +9,11 @@ import ( "os" "os/exec" "path/filepath" + "quyun/app/service" "strconv" "strings" "time" - "quyun/app/service" - "go.ipao.vip/atom" "go.ipao.vip/atom/container" diff --git a/backend/app/service/event/event.go b/backend/app/service/event/event.go index 0ee18f2..57b3e08 100644 --- a/backend/app/service/event/event.go +++ b/backend/app/service/event/event.go @@ -2,16 +2,16 @@ package event import ( "context" - - "go.ipao.vip/atom" - "go.ipao.vip/atom/container" - "go.ipao.vip/atom/contracts" "quyun/app/events/subscribers" "quyun/app/service" "quyun/providers/app" "quyun/providers/event" "quyun/providers/postgres" + "go.ipao.vip/atom" + "go.ipao.vip/atom/container" + "go.ipao.vip/atom/contracts" + log "github.com/sirupsen/logrus" "github.com/spf13/cobra" "go.uber.org/dig" diff --git a/backend/app/service/grpc/grpc.go b/backend/app/service/grpc/grpc.go index 6d10f04..15d7c9a 100644 --- a/backend/app/service/grpc/grpc.go +++ b/backend/app/service/grpc/grpc.go @@ -1,15 +1,16 @@ package grpc import ( - "go.ipao.vip/atom" - "go.ipao.vip/atom/container" - "go.ipao.vip/atom/contracts" "quyun/app/grpc/users" "quyun/app/service" "quyun/providers/app" "quyun/providers/grpc" "quyun/providers/postgres" + "go.ipao.vip/atom" + "go.ipao.vip/atom/container" + "go.ipao.vip/atom/contracts" + log "github.com/sirupsen/logrus" "github.com/spf13/cobra" "go.uber.org/dig" diff --git a/backend/app/service/http/http.go b/backend/app/service/http/http.go index 00188bc..ba97139 100644 --- a/backend/app/service/http/http.go +++ b/backend/app/service/http/http.go @@ -4,13 +4,10 @@ import ( "context" "mime" "path/filepath" - - appHttp "quyun/app/http" "quyun/app/jobs" "quyun/app/middlewares" "quyun/app/models" "quyun/app/service" - _ "quyun/docs" "quyun/providers/ali" "quyun/providers/app" "quyun/providers/hashids" @@ -22,6 +19,10 @@ import ( "quyun/providers/wechat" "quyun/providers/wepay" + appHttp "quyun/app/http" + + _ "quyun/docs" + "go.ipao.vip/atom" "go.ipao.vip/atom/container" "go.ipao.vip/atom/contracts" diff --git a/backend/app/service/migrate/migrate.go b/backend/app/service/migrate/migrate.go index 33a9f0c..de2579c 100644 --- a/backend/app/service/migrate/migrate.go +++ b/backend/app/service/migrate/migrate.go @@ -3,7 +3,6 @@ package migrate import ( "context" "database/sql" - "quyun/app/service" "quyun/database" "quyun/providers/postgres" diff --git a/backend/app/service/queue/river.go b/backend/app/service/queue/river.go index 498c446..ce6e878 100644 --- a/backend/app/service/queue/river.go +++ b/backend/app/service/queue/river.go @@ -2,16 +2,16 @@ package queue import ( "context" - - "go.ipao.vip/atom" - "go.ipao.vip/atom/container" - "go.ipao.vip/atom/contracts" "quyun/app/jobs" "quyun/app/service" "quyun/providers/app" "quyun/providers/job" "quyun/providers/postgres" + "go.ipao.vip/atom" + "go.ipao.vip/atom/container" + "go.ipao.vip/atom/contracts" + log "github.com/sirupsen/logrus" "github.com/spf13/cobra" "go.uber.org/dig" diff --git a/backend/app/service/service.go b/backend/app/service/service.go index b064599..7c141c3 100644 --- a/backend/app/service/service.go +++ b/backend/app/service/service.go @@ -1,9 +1,10 @@ package service import ( - "go.ipao.vip/atom/container" "quyun/providers/app" "quyun/providers/event" + + "go.ipao.vip/atom/container" ) func Default(providers ...container.ProviderContainer) container.Providers { diff --git a/backend/app/service/testx/testing.go b/backend/app/service/testx/testing.go index 199de52..a5b98b2 100644 --- a/backend/app/service/testx/testing.go +++ b/backend/app/service/testx/testing.go @@ -2,13 +2,12 @@ package testx import ( "os" - "testing" - "quyun/providers/ali" "quyun/providers/app" "quyun/providers/job" "quyun/providers/postgres" "quyun/providers/wechat" + "testing" "go.ipao.vip/atom" "go.ipao.vip/atom/container" diff --git a/backend/config.yh.toml b/backend/config.yh.toml index 22846c2..14e3554 100644 --- a/backend/config.yh.toml +++ b/backend/config.yh.toml @@ -74,6 +74,12 @@ im8aIZZ9jDKUFxtjVUL0l9fjRsCLAvaBbWw3z4EdtOGuYlnhNCheeSd+/Lzqrb1q pnTiwBHnQCMFFL/rNcz/Mmk= -----END PRIVATE KEY-----""" PublicKeyID="PUB_KEY_ID_0117026449472025041400331572000400" -# JS pay domain -# mp.jdwan.com -# quyun.mp.jdwan.com +PublicKey="""-----BEGIN PUBLIC KEY----- +MIIBIjANBgkqhkiG9w0BAQEFAAOCAQ8AMIIBCgKCAQEAxcBzCAfddF4y/e3aT92g +z/DWNNFFdoKUxxSAjBFdq+7cHGL/b6VmHLfbZqUF2JvlGYoVxE/vHWrrtDYzPctN ++IaGqwiPSAjvJTHTlxpxZkLz+9YGynrj9jbl12gY73mo/M1jJqmrERN6ZA5P8oNl +tjNmYNK/H5FLuZVVUilEiWn8XskxGEKiGh0KhMEl3YRPPzguADPck9Ip4tgn4UDt +fUs5UFrzH3A4cpuc1Je3wJ3vqztu3sr+G3LBSXCvYD7EkDhXMHCv01cJBxBN876T +442YAFX94VJ79/xwwmXOgCLz1QegDd6M+Um0l5BkQoOIqDlEkWsOvRo9iOsZ25H9 +kQIDAQAB +-----END PUBLIC KEY-----""" \ No newline at end of file diff --git a/backend/database/fields/orders.go b/backend/database/fields/orders.go index 6ed3bc6..a31f003 100644 --- a/backend/database/fields/orders.go +++ b/backend/database/fields/orders.go @@ -1,7 +1,13 @@ package fields +import ( + "quyun/providers/wepay" +) + // swagger:enum OrderStatus // ENUM( pending, paid, refunding, refunded, cancelled, completed) type OrderStatus int16 -type OrderMeta struct{} +type OrderMeta struct { + PayNotify wepay.PayNotify `json:"pay_notify"` +} diff --git a/backend/go.mod b/backend/go.mod index 835fa06..19df2af 100644 --- a/backend/go.mod +++ b/backend/go.mod @@ -1,8 +1,6 @@ module quyun -go 1.23.0 - -toolchain go1.24.0 +go 1.24.0 require ( github.com/ThreeDotsLabs/watermill v1.4.3 @@ -11,6 +9,7 @@ require ( github.com/ThreeDotsLabs/watermill-sql/v3 v3.1.0 github.com/aliyun/alibabacloud-oss-go-sdk-v2 v1.2.1 github.com/go-jet/jet/v2 v2.13.0 + github.com/go-pay/errgroup v0.0.3 github.com/go-pay/gopay v1.5.110 github.com/go-pay/util v0.0.4 github.com/gofiber/fiber/v3 v3.0.0-beta.4 @@ -83,7 +82,6 @@ require ( github.com/go-openapi/spec v0.21.0 // indirect github.com/go-openapi/swag v0.23.0 // indirect github.com/go-pay/crypto v0.0.1 // indirect - github.com/go-pay/errgroup v0.0.3 // indirect github.com/go-pay/smap v0.0.2 // indirect github.com/go-pay/xlog v0.0.3 // indirect github.com/go-pay/xtime v0.0.2 // indirect diff --git a/backend/go.sum b/backend/go.sum index 9745d74..f4eab1c 100644 --- a/backend/go.sum +++ b/backend/go.sum @@ -256,8 +256,6 @@ github.com/riverqueue/river/rivertype v0.15.0 h1:+TXRnvQv1ulV24uQnsuZmbb3yJdmbpi github.com/riverqueue/river/rivertype v0.15.0/go.mod h1:4vpt5ZSdZ35mFbRAV4oXgeRdH3Mq5h1pUzQTvaGfCUA= github.com/robfig/cron/v3 v3.0.1 h1:WdRxkvbJztn8LMz/QEvLN5sBU+xKpSqwwUO1Pjr4qDs= github.com/robfig/cron/v3 v3.0.1/go.mod h1:eQICP3HwyT7UooqI/z+Ov+PtYAWygg1TEWWzGIFLtro= -github.com/rogeecn/fabfile v1.4.0 h1:Rw7/7OH8cV4aRPw79Oa4hHHFKaC/ol+sNmGcB/usHaQ= -github.com/rogeecn/fabfile v1.4.0/go.mod h1:EPwX7TtVcIWSLJkJAqxSzYjM/aV1Q0wymcaXqnMgzas= github.com/rogeecn/fabfile v1.5.0 h1:I4yxvNRzsjEKMyq3opc32mmaBlfnZBjBUpjdMRR2zOM= github.com/rogeecn/fabfile v1.5.0/go.mod h1:EPwX7TtVcIWSLJkJAqxSzYjM/aV1Q0wymcaXqnMgzas= github.com/rogeecn/swag v1.0.1 h1:s1yxLgopqO1m8sqGjVmt6ocMBRubMPIh2JtIPG4xjQE= diff --git a/backend/providers/cmux/config.go b/backend/providers/cmux/config.go index 74a1254..ef172c1 100644 --- a/backend/providers/cmux/config.go +++ b/backend/providers/cmux/config.go @@ -2,7 +2,6 @@ package cmux import ( "fmt" - "quyun/providers/grpc" "quyun/providers/http" diff --git a/backend/providers/cmux/provider.go b/backend/providers/cmux/provider.go index 5a77114..79fc6b7 100644 --- a/backend/providers/cmux/provider.go +++ b/backend/providers/cmux/provider.go @@ -2,7 +2,6 @@ package cmux import ( "net" - "quyun/providers/grpc" "quyun/providers/http" diff --git a/backend/providers/job/provider.go b/backend/providers/job/provider.go index d43b02c..66a53f4 100644 --- a/backend/providers/job/provider.go +++ b/backend/providers/job/provider.go @@ -2,9 +2,8 @@ package job import ( "context" - "sync" - "quyun/providers/postgres" + "sync" "github.com/jackc/pgx/v5" "github.com/jackc/pgx/v5/pgxpool" diff --git a/backend/providers/req/client.go b/backend/providers/req/client.go index e4c8fa0..58a0265 100644 --- a/backend/providers/req/client.go +++ b/backend/providers/req/client.go @@ -5,12 +5,11 @@ import ( "net/url" "os" "path/filepath" + "quyun/providers/req/cookiejar" "strconv" "strings" "time" - "quyun/providers/req/cookiejar" - "github.com/imroc/req/v3" "go.ipao.vip/atom/container" "go.ipao.vip/atom/opt" diff --git a/backend/providers/wechat/wechat.go b/backend/providers/wechat/wechat.go index 0fc874f..2304e56 100644 --- a/backend/providers/wechat/wechat.go +++ b/backend/providers/wechat/wechat.go @@ -5,12 +5,11 @@ import ( "encoding/hex" "fmt" "net/url" + "quyun/pkg/oauth" "sort" "strings" "time" - "quyun/pkg/oauth" - "github.com/imroc/req/v3" "github.com/pkg/errors" ) diff --git a/backend/providers/wepay/config.go b/backend/providers/wepay/config.go index aad1637..d0a214c 100644 --- a/backend/providers/wepay/config.go +++ b/backend/providers/wepay/config.go @@ -1,6 +1,8 @@ package wepay import ( + "time" + "go.ipao.vip/atom/container" "go.ipao.vip/atom/opt" ) @@ -15,3 +17,25 @@ func DefaultProvider() container.ProviderContainer { }, } } + +type PayNotify struct { + Mchid string `json:"mchid"` + Appid string `json:"appid"` + OutTradeNo string `json:"out_trade_no"` + TransactionID string `json:"transaction_id"` + TradeType string `json:"trade_type"` + TradeState string `json:"trade_state"` + TradeStateDesc string `json:"trade_state_desc"` + BankType string `json:"bank_type"` + Attach string `json:"attach"` + SuccessTime time.Time `json:"success_time"` + Payer struct { + Openid string `json:"openid"` + } `json:"payer"` + Amount struct { + Total int64 `json:"total"` + PayerTotal int64 `json:"payer_total"` + Currency string `json:"currency"` + PayerCurrency string `json:"payer_currency"` + } `json:"amount"` +} diff --git a/backend/providers/wepay/pay.go b/backend/providers/wepay/pay.go index 6378523..115833a 100644 --- a/backend/providers/wepay/pay.go +++ b/backend/providers/wepay/pay.go @@ -4,12 +4,16 @@ import ( "context" "crypto/rsa" "encoding/json" + "fmt" + "net/http" "time" w "quyun/providers/wechat" "github.com/go-pay/gopay" "github.com/go-pay/gopay/wechat/v3" + "github.com/go-pay/util/js" + "github.com/gofiber/fiber/v3" "github.com/pkg/errors" log "github.com/sirupsen/logrus" "go.ipao.vip/atom/container" @@ -101,6 +105,42 @@ func (c *Client) V3TransactionJsapi(ctx context.Context, f func(*BodyMap)) (*Pre }, nil } +func (c *Client) ParseNotify(ctx fiber.Ctx) (*PayNotify, error) { + body := ctx.Body() + si := &wechat.SignInfo{ + HeaderTimestamp: ctx.Get(wechat.HeaderTimestamp), + HeaderNonce: ctx.Get(wechat.HeaderNonce), + HeaderSignature: ctx.Get(wechat.HeaderSignature), + HeaderSerial: ctx.Get(wechat.HeaderSerial), + SignBody: string(body), + } + notifyReq := &wechat.V3NotifyReq{SignInfo: si} + if err := js.UnmarshalBytes(body, notifyReq); err != nil { + log.Errorf("json unmarshal error:%v", err) + return nil, ctx.Status(http.StatusBadRequest).JSON(fiber.Map{"error": fmt.Sprintf("json unmarshal error:%v", err)}) + } + + // 获取微信平台证书 + certMap := c.WxPublicKeyMap() + + // 验证异步通知的签名 + err := notifyReq.VerifySignByPKMap(certMap) + if err != nil { + log.Errorf("verify sign error:%v", err) + return nil, ctx.Status(http.StatusBadRequest).JSON(fiber.Map{"error": "Invalid signature"}) + } + + var notifyData PayNotify + err = notifyReq.DecryptCipherTextToStruct(c.config.Pay.ApiV3Key, ¬ifyData) + if err != nil { + return nil, ctx.Status(http.StatusBadRequest).JSON(fiber.Map{"error": "Invalid cipher text"}) + } + + log.Infof("Successfully decrypted cipher text for notify data: %+v", notifyData) + + return ¬ifyData, nil +} + type BodyMap struct { bm gopay.BodyMap } diff --git a/backend/providers/wepay/pay_test.go b/backend/providers/wepay/pay_test.go index 2d470b0..5776cf1 100644 --- a/backend/providers/wepay/pay_test.go +++ b/backend/providers/wepay/pay_test.go @@ -3,11 +3,12 @@ package wepay import ( "context" "fmt" + "quyun/app/service/testx" "testing" "time" - "quyun/app/service/testx" - + "github.com/go-pay/gopay/wechat/v3" + "github.com/go-pay/util/js" . "github.com/smartystreets/goconvey/convey" "github.com/stretchr/testify/suite" "go.ipao.vip/atom/contracts" @@ -53,3 +54,22 @@ func (s *WePayTestSuite) Test_PrePay() { }) }) } + +func (s *WePayTestSuite) Test_parseNotify() { + Convey("parse notify", s.T(), func() { + Convey("prepay", func() { + content := `{"id":"43d17a94-eb1e-5641-bb11-f59e5b6e8749","summary":"支付成功","resource":{"nonce":"avbpSc2seCN5","algorithm":"AEAD_AES_256_GCM","ciphertext":"20VGA2uItmbqFvGBxBug2K3eORRyy/xYswoDA7v4+Yi2ArHnXCXzScVn6kD3ZVpKLiFY7zcTPpTxk2JFJF3vG/6WGG7uuD8DDK7keJk0PZoAfvmSPskQzieOVz3Tgmqp3SkE74mJHX1MeMZHMXMmzMJ4Mp1OmYD2YpiWsF7jlAtiGqxHSC//YlKGaJ/9r0QG4TwZcFpm+X4qkdBNX+DcSCjYeXGyWIm2bVujj63rO43DEA5x0nytdBSrpup/T85khZzNVue1EcyF5XY7PguePU3Q2o+e1c/LnoL9nN7S+n2ljm+nN3uCAhz8eqkPn4uowiq37Tw4JZ2rx2rXCb9jYKmt+I8JHpOij4SgX6oQd7fLeZHsbHC/05s0A1qdLzeF5AKgrAOQT/T1yQ+LsWTnY2ftXAP6mnqGE8Z+vQm5PGo8xsQ8AycVaAhwaRLFvn/XtwlkumfuduAojimFRSNElWwHcApnT+ekqzBrKnAvKo8hdeygf9QWHENcNWVwqwjUWIHe/fGWgJbc6u595bEHb4MkcI8ESD/6bpay/Wk6SyvZCJHqS1WWaPaU0xh9","original_type":"transaction","associated_data":"transaction"},"event_type":"TRANSACTION.SUCCESS","create_time":"2025-04-30T19:25:51+08:00","resource_type":"encrypt-resource"}` + var notifyReq wechat.V3NotifyReq + err := js.UnmarshalBytes([]byte(content), ¬ifyReq) + So(err, ShouldBeNil) + + s.T().Logf("notifyReq: %+v", notifyReq) + + var obj struct{} + err = notifyReq.DecryptCipherTextToStruct("5UBDkxVDY44AKafkqN6YgYxgtkXP6Mw6", &obj) + So(err, ShouldBeNil) + + s.T().Logf("Decrypted object: %+v", obj) + }) + }) +} diff --git a/backend/quyun b/backend/quyun index b3c29da..e1ea1f5 100755 Binary files a/backend/quyun and b/backend/quyun differ