init
This commit is contained in:
0
app/console/.gitkeep
Normal file
0
app/console/.gitkeep
Normal file
152
app/errorx/error.go
Normal file
152
app/errorx/error.go
Normal file
@@ -0,0 +1,152 @@
|
||||
package errorx
|
||||
|
||||
import (
|
||||
"errors"
|
||||
"fmt"
|
||||
"net/http"
|
||||
"runtime"
|
||||
"strings"
|
||||
|
||||
"github.com/go-jet/jet/v2/qrm"
|
||||
"github.com/gofiber/fiber/v3"
|
||||
"github.com/gofiber/fiber/v3/binder"
|
||||
"github.com/gofiber/utils/v2"
|
||||
log "github.com/sirupsen/logrus"
|
||||
)
|
||||
|
||||
func Middleware(c fiber.Ctx) error {
|
||||
err := c.Next()
|
||||
if err != nil {
|
||||
return Wrap(err).Response(c)
|
||||
}
|
||||
return err
|
||||
}
|
||||
|
||||
type Response struct {
|
||||
isFormat bool
|
||||
err error
|
||||
params []any
|
||||
sql string
|
||||
file string
|
||||
|
||||
StatusCode int `json:"-" xml:"-"`
|
||||
Code int `json:"code" xml:"code"`
|
||||
Message string `json:"message" xml:"message"`
|
||||
Data any `json:"data,omitempty" xml:"data"`
|
||||
}
|
||||
|
||||
func New(code, statusCode int, message string) *Response {
|
||||
return &Response{
|
||||
isFormat: true,
|
||||
StatusCode: statusCode,
|
||||
Code: code,
|
||||
Message: message,
|
||||
}
|
||||
}
|
||||
|
||||
func (r *Response) WithMsg(msg string) *Response {
|
||||
r.Message = msg
|
||||
return r
|
||||
}
|
||||
|
||||
func (r *Response) Sql(sql string) *Response {
|
||||
r.sql = sql
|
||||
return r
|
||||
}
|
||||
|
||||
func (r *Response) from(err *Response) *Response {
|
||||
r.Code = err.Code
|
||||
r.Message = err.Message
|
||||
r.StatusCode = err.StatusCode
|
||||
return r
|
||||
}
|
||||
|
||||
func (r *Response) Params(params ...any) *Response {
|
||||
r.params = params
|
||||
if _, file, line, ok := runtime.Caller(1); ok {
|
||||
r.file = fmt.Sprintf("%s:%d", file, line)
|
||||
}
|
||||
return r
|
||||
}
|
||||
|
||||
func Wrap(err error) *Response {
|
||||
if e, ok := err.(*Response); ok {
|
||||
return e
|
||||
}
|
||||
return &Response{err: err}
|
||||
}
|
||||
|
||||
func (r *Response) Wrap(err error) *Response {
|
||||
r.err = err
|
||||
return r
|
||||
}
|
||||
|
||||
func (r *Response) format() {
|
||||
r.isFormat = true
|
||||
if errors.Is(r.err, qrm.ErrNoRows) {
|
||||
r.from(RecordNotExists)
|
||||
return
|
||||
}
|
||||
|
||||
if e, ok := r.err.(*fiber.Error); ok {
|
||||
r.Code = e.Code
|
||||
r.Message = e.Message
|
||||
r.StatusCode = e.Code
|
||||
return
|
||||
}
|
||||
|
||||
if r.err != nil {
|
||||
msg := r.err.Error()
|
||||
if strings.Contains(msg, "duplicate key value") || strings.Contains(msg, "unique constraint") {
|
||||
r.from(RecordDuplicated)
|
||||
return
|
||||
}
|
||||
|
||||
r.Code = http.StatusInternalServerError
|
||||
r.StatusCode = http.StatusInternalServerError
|
||||
r.Message = msg
|
||||
}
|
||||
return
|
||||
}
|
||||
|
||||
func (r *Response) Error() string {
|
||||
if !r.isFormat {
|
||||
r.format()
|
||||
}
|
||||
|
||||
return fmt.Sprintf("[%d] %s", r.Code, r.Message)
|
||||
}
|
||||
|
||||
func (r *Response) Response(ctx fiber.Ctx) error {
|
||||
if !r.isFormat {
|
||||
r.format()
|
||||
}
|
||||
|
||||
contentType := utils.ToLower(utils.UnsafeString(ctx.Request().Header.ContentType()))
|
||||
contentType = binder.FilterFlags(utils.ParseVendorSpecificContentType(contentType))
|
||||
|
||||
log.
|
||||
WithError(r.err).
|
||||
WithField("file", r.file).
|
||||
WithField("sql", r.sql).
|
||||
WithField("params", r.params).
|
||||
Errorf("response error: %+v", r)
|
||||
|
||||
// Parse body accordingly
|
||||
switch contentType {
|
||||
case fiber.MIMETextXML, fiber.MIMEApplicationXML:
|
||||
return ctx.Status(r.StatusCode).XML(r)
|
||||
case fiber.MIMETextHTML, fiber.MIMETextPlain:
|
||||
return ctx.Status(r.StatusCode).SendString(r.Message)
|
||||
default:
|
||||
return ctx.Status(r.StatusCode).JSON(r)
|
||||
}
|
||||
}
|
||||
|
||||
var (
|
||||
RecordDuplicated = New(1001, http.StatusBadRequest, "记录重复")
|
||||
RecordNotExists = New(http.StatusNotFound, http.StatusNotFound, "记录不存在")
|
||||
BadRequest = New(http.StatusBadRequest, http.StatusBadRequest, "请求错误")
|
||||
Unauthorized = New(http.StatusUnauthorized, http.StatusUnauthorized, "未授权")
|
||||
InternalErr = New(http.StatusInternalServerError, http.StatusInternalServerError, "内部错误")
|
||||
)
|
||||
22
app/events/publishers/user_register.go
Normal file
22
app/events/publishers/user_register.go
Normal file
@@ -0,0 +1,22 @@
|
||||
package publishers
|
||||
|
||||
import (
|
||||
"encoding/json"
|
||||
|
||||
"go.ipao.vip/atom/contracts"
|
||||
"quyun/app/events"
|
||||
)
|
||||
|
||||
var _ contracts.EventPublisher = (*UserRegister)(nil)
|
||||
|
||||
type UserRegister struct {
|
||||
ID int64 `json:"id"`
|
||||
}
|
||||
|
||||
func (e *UserRegister) Marshal() ([]byte, error) {
|
||||
return json.Marshal(e)
|
||||
}
|
||||
|
||||
func (e *UserRegister) Topic() string {
|
||||
return events.TopicUserRegister
|
||||
}
|
||||
27
app/events/subscribers/provider.gen.go
Executable file
27
app/events/subscribers/provider.gen.go
Executable file
@@ -0,0 +1,27 @@
|
||||
package subscribers
|
||||
|
||||
import (
|
||||
"quyun/providers/event"
|
||||
|
||||
"go.ipao.vip/atom"
|
||||
"go.ipao.vip/atom/container"
|
||||
"go.ipao.vip/atom/contracts"
|
||||
"go.ipao.vip/atom/opt"
|
||||
)
|
||||
|
||||
func Provide(opts ...opt.Option) error {
|
||||
if err := container.Container.Provide(func(
|
||||
__event *event.PubSub,
|
||||
) (contracts.Initial, error) {
|
||||
obj := &UserRegister{}
|
||||
if err := obj.Prepare(); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
__event.Handle("handler:UserRegister", obj.Topic(), obj.PublishToTopic(), obj.Handler)
|
||||
|
||||
return obj, nil
|
||||
}, atom.GroupInitial); err != nil {
|
||||
return err
|
||||
}
|
||||
return nil
|
||||
}
|
||||
46
app/events/subscribers/user_register.go
Normal file
46
app/events/subscribers/user_register.go
Normal file
@@ -0,0 +1,46 @@
|
||||
package subscribers
|
||||
|
||||
import (
|
||||
"encoding/json"
|
||||
|
||||
"go.ipao.vip/atom/contracts"
|
||||
"quyun/app/events"
|
||||
"quyun/app/events/publishers"
|
||||
|
||||
"github.com/ThreeDotsLabs/watermill/message"
|
||||
"github.com/sirupsen/logrus"
|
||||
)
|
||||
|
||||
var _ contracts.EventHandler = (*UserRegister)(nil)
|
||||
|
||||
// @provider(event)
|
||||
type UserRegister struct {
|
||||
log *logrus.Entry `inject:"false"`
|
||||
}
|
||||
|
||||
func (e *UserRegister) Prepare() error {
|
||||
e.log = logrus.WithField("module", "events.subscribers.user_register")
|
||||
return nil
|
||||
}
|
||||
|
||||
// PublishToTopic implements contracts.EventHandler.
|
||||
func (e *UserRegister) PublishToTopic() string {
|
||||
return events.TopicProcessed
|
||||
}
|
||||
|
||||
// Topic implements contracts.EventHandler.
|
||||
func (e *UserRegister) Topic() string {
|
||||
return events.TopicUserRegister
|
||||
}
|
||||
|
||||
// Handler implements contracts.EventHandler.
|
||||
func (e *UserRegister) Handler(msg *message.Message) ([]*message.Message, error) {
|
||||
var payload publishers.UserRegister
|
||||
err := json.Unmarshal(msg.Payload, &payload)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
e.log.Infof("received event %s", msg.Payload)
|
||||
|
||||
return nil, nil
|
||||
}
|
||||
6
app/events/topics.go
Normal file
6
app/events/topics.go
Normal file
@@ -0,0 +1,6 @@
|
||||
package events
|
||||
|
||||
const (
|
||||
TopicProcessed = "event:processed"
|
||||
TopicUserRegister = "event:user_register"
|
||||
)
|
||||
26
app/grpc/users/handler.go
Normal file
26
app/grpc/users/handler.go
Normal file
@@ -0,0 +1,26 @@
|
||||
package users
|
||||
|
||||
import (
|
||||
"context"
|
||||
|
||||
userv1 "quyun/pkg/proto/user/v1"
|
||||
)
|
||||
|
||||
// @provider(grpc) userv1.RegisterUserServiceServer
|
||||
type Users struct {
|
||||
userv1.UnimplementedUserServiceServer
|
||||
}
|
||||
|
||||
func (u *Users) ListUsers(ctx context.Context, in *userv1.ListUsersRequest) (*userv1.ListUsersResponse, error) {
|
||||
// userv1.UserServiceServer
|
||||
return &userv1.ListUsersResponse{}, nil
|
||||
}
|
||||
|
||||
// GetUser implements userv1.UserServiceServer
|
||||
func (u *Users) GetUser(ctx context.Context, in *userv1.GetUserRequest) (*userv1.GetUserResponse, error) {
|
||||
return &userv1.GetUserResponse{
|
||||
User: &userv1.User{
|
||||
Id: in.Id,
|
||||
},
|
||||
}, nil
|
||||
}
|
||||
25
app/grpc/users/provider.gen.go
Executable file
25
app/grpc/users/provider.gen.go
Executable file
@@ -0,0 +1,25 @@
|
||||
package users
|
||||
|
||||
import (
|
||||
userv1 "quyun/pkg/proto/user/v1"
|
||||
"quyun/providers/grpc"
|
||||
|
||||
"go.ipao.vip/atom"
|
||||
"go.ipao.vip/atom/container"
|
||||
"go.ipao.vip/atom/contracts"
|
||||
"go.ipao.vip/atom/opt"
|
||||
)
|
||||
|
||||
func Provide(opts ...opt.Option) error {
|
||||
if err := container.Container.Provide(func(
|
||||
__grpc *grpc.Grpc,
|
||||
) (contracts.Initial, error) {
|
||||
obj := &Users{}
|
||||
userv1.RegisterUserServiceServer(__grpc.Server, obj)
|
||||
|
||||
return obj, nil
|
||||
}, atom.GroupInitial); err != nil {
|
||||
return err
|
||||
}
|
||||
return nil
|
||||
}
|
||||
0
app/http/.gitkeep
Normal file
0
app/http/.gitkeep
Normal file
0
app/http/.help.md
Normal file
0
app/http/.help.md
Normal file
18
app/http/medias.go
Normal file
18
app/http/medias.go
Normal file
@@ -0,0 +1,18 @@
|
||||
package http
|
||||
|
||||
import (
|
||||
"quyun/app/models"
|
||||
"quyun/app/requests"
|
||||
|
||||
"github.com/gofiber/fiber/v3"
|
||||
)
|
||||
|
||||
// @provider
|
||||
type medias struct{}
|
||||
|
||||
// List medias
|
||||
// @Router /v1/medias [get]
|
||||
// @Bind pagination query
|
||||
func (ctl *medias) List(ctx fiber.Ctx, pagination *requests.Pagination) (*requests.Pager, error) {
|
||||
return models.Medias.List(ctx.Context(), pagination)
|
||||
}
|
||||
48
app/http/provider.gen.go
Executable file
48
app/http/provider.gen.go
Executable file
@@ -0,0 +1,48 @@
|
||||
package http
|
||||
|
||||
import (
|
||||
"quyun/providers/app"
|
||||
|
||||
"go.ipao.vip/atom"
|
||||
"go.ipao.vip/atom/container"
|
||||
"go.ipao.vip/atom/contracts"
|
||||
"go.ipao.vip/atom/opt"
|
||||
)
|
||||
|
||||
func Provide(opts ...opt.Option) error {
|
||||
if err := container.Container.Provide(func() (*medias, error) {
|
||||
obj := &medias{}
|
||||
|
||||
return obj, nil
|
||||
}); err != nil {
|
||||
return err
|
||||
}
|
||||
if err := container.Container.Provide(func(
|
||||
medias *medias,
|
||||
uploads *uploads,
|
||||
) (contracts.HttpRoute, error) {
|
||||
obj := &Routes{
|
||||
medias: medias,
|
||||
uploads: uploads,
|
||||
}
|
||||
if err := obj.Prepare(); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
return obj, nil
|
||||
}, atom.GroupRoutes); err != nil {
|
||||
return err
|
||||
}
|
||||
if err := container.Container.Provide(func(
|
||||
app *app.Config,
|
||||
) (*uploads, error) {
|
||||
obj := &uploads{
|
||||
app: app,
|
||||
}
|
||||
|
||||
return obj, nil
|
||||
}); err != nil {
|
||||
return err
|
||||
}
|
||||
return nil
|
||||
}
|
||||
52
app/http/routes.gen.go
Normal file
52
app/http/routes.gen.go
Normal file
@@ -0,0 +1,52 @@
|
||||
// Code generated by the atomctl ; DO NOT EDIT.
|
||||
|
||||
package http
|
||||
|
||||
import (
|
||||
"github.com/gofiber/fiber/v3"
|
||||
log "github.com/sirupsen/logrus"
|
||||
_ "go.ipao.vip/atom"
|
||||
_ "go.ipao.vip/atom/contracts"
|
||||
. "go.ipao.vip/atom/fen"
|
||||
"mime/multipart"
|
||||
"quyun/app/requests"
|
||||
)
|
||||
|
||||
// @provider contracts.HttpRoute atom.GroupRoutes
|
||||
type Routes struct {
|
||||
log *log.Entry `inject:"false"`
|
||||
medias *medias
|
||||
uploads *uploads
|
||||
}
|
||||
|
||||
func (r *Routes) Prepare() error {
|
||||
r.log = log.WithField("module", "routes.http")
|
||||
return nil
|
||||
}
|
||||
|
||||
func (r *Routes) Name() string {
|
||||
return "http"
|
||||
}
|
||||
|
||||
func (r *Routes) Register(router fiber.Router) {
|
||||
// 注册路由组: medias
|
||||
router.Get("/v1/medias", DataFunc1(
|
||||
r.medias.List,
|
||||
Query[requests.Pagination]("pagination"),
|
||||
))
|
||||
|
||||
// 注册路由组: uploads
|
||||
router.Post("/v1/uploads/:md5/chunks/:idx", Func3(
|
||||
r.uploads.Chunks,
|
||||
PathParam[string]("md5"),
|
||||
PathParam[string]("idx"),
|
||||
File[multipart.FileHeader]("file"),
|
||||
))
|
||||
|
||||
router.Post("/v1/uploads/:md5/complete", Func2(
|
||||
r.uploads.Complete,
|
||||
PathParam[string]("md5"),
|
||||
Body[UploadFileInfo]("body"),
|
||||
))
|
||||
|
||||
}
|
||||
119
app/http/uploads.go
Normal file
119
app/http/uploads.go
Normal file
@@ -0,0 +1,119 @@
|
||||
package http
|
||||
|
||||
import (
|
||||
"errors"
|
||||
"fmt"
|
||||
"mime/multipart"
|
||||
"os"
|
||||
"path/filepath"
|
||||
|
||||
"quyun/pkg/utils"
|
||||
"quyun/providers/app"
|
||||
|
||||
"github.com/gofiber/fiber/v3"
|
||||
log "github.com/sirupsen/logrus"
|
||||
)
|
||||
|
||||
// @provider
|
||||
type uploads struct {
|
||||
app *app.Config
|
||||
}
|
||||
|
||||
func (up *uploads) storagePath() string {
|
||||
return filepath.Join(up.app.StoragePath, "uploads/tmp")
|
||||
}
|
||||
|
||||
type UploadChunk struct {
|
||||
Chunk int `query:"chunk"`
|
||||
Md5 string `query:"md5"`
|
||||
}
|
||||
|
||||
type UploadFileInfo struct {
|
||||
Md5 string `json:"md5"`
|
||||
Filename string `json:"filename"`
|
||||
Mime string `json:"mime"`
|
||||
Size int64 `json:"size"`
|
||||
Chunks int `json:"chunks"`
|
||||
}
|
||||
|
||||
// Upload chunks
|
||||
// @Router /v1/uploads/:md5/chunks/:idx [post]
|
||||
// @Bind md5 path
|
||||
// @Bind idx path
|
||||
// @Bind file file
|
||||
func (up *uploads) Chunks(ctx fiber.Ctx, md5, idx string, file *multipart.FileHeader) error {
|
||||
tmpPath := filepath.Join(up.storagePath(), md5, idx)
|
||||
|
||||
// if tmpPath not exists, create it
|
||||
if _, err := os.Stat(tmpPath); os.IsNotExist(err) {
|
||||
if err := os.MkdirAll(filepath.Dir(tmpPath), os.ModePerm); err != nil {
|
||||
log.WithError(err).Errorf("create tmpPath failed %s", tmpPath)
|
||||
return err
|
||||
}
|
||||
}
|
||||
|
||||
// save file to tmpPath
|
||||
if err := ctx.SaveFile(file, tmpPath); err != nil {
|
||||
log.WithError(err).Errorf("save file to tmpPath failed %s", tmpPath)
|
||||
return err
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
// Complete uploads
|
||||
// @Router /v1/uploads/:md5/complete [post]
|
||||
// @Bind md5 path
|
||||
// @Bind body body
|
||||
func (up *uploads) Complete(ctx fiber.Ctx, md5 string, body *UploadFileInfo) error {
|
||||
// merge chunks
|
||||
path := filepath.Join(up.storagePath(), md5)
|
||||
defer os.RemoveAll(path)
|
||||
|
||||
targetFile := filepath.Join(up.storagePath(), md5, body.Filename)
|
||||
|
||||
// if targetFile not exists, create it
|
||||
tf, err := os.Create(targetFile)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
for i := 0; i < body.Chunks; i++ {
|
||||
tmpPath := filepath.Join(up.storagePath(), md5, fmt.Sprintf("%d", i))
|
||||
|
||||
// open chunk file
|
||||
chunkFile, err := os.Open(tmpPath)
|
||||
if err != nil {
|
||||
tf.Close()
|
||||
return err
|
||||
}
|
||||
|
||||
// copy chunk file to target file
|
||||
if _, err := tf.ReadFrom(chunkFile); err != nil {
|
||||
chunkFile.Close()
|
||||
tf.Close()
|
||||
return err
|
||||
}
|
||||
|
||||
chunkFile.Close()
|
||||
}
|
||||
tf.Close()
|
||||
|
||||
// validate md5
|
||||
ok, err := utils.CompareFileMd5(targetFile, md5)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
if !ok {
|
||||
return errors.New("md5 not match")
|
||||
}
|
||||
|
||||
// save file to target path
|
||||
targetPath := filepath.Join(up.storagePath(), body.Filename)
|
||||
|
||||
if err := os.Rename(targetFile, targetPath); err != nil {
|
||||
return err
|
||||
}
|
||||
// TODO: save file to database
|
||||
|
||||
return nil
|
||||
}
|
||||
36
app/jobs/demo_cron.go
Normal file
36
app/jobs/demo_cron.go
Normal file
@@ -0,0 +1,36 @@
|
||||
package jobs
|
||||
|
||||
import (
|
||||
"time"
|
||||
|
||||
. "github.com/riverqueue/river"
|
||||
"github.com/sirupsen/logrus"
|
||||
_ "go.ipao.vip/atom"
|
||||
"go.ipao.vip/atom/contracts"
|
||||
)
|
||||
|
||||
var _ contracts.CronJob = (*CronJob)(nil)
|
||||
|
||||
// @provider(cronjob)
|
||||
type CronJob struct {
|
||||
log *logrus.Entry `inject:"false"`
|
||||
}
|
||||
|
||||
// Prepare implements contracts.CronJob.
|
||||
func (CronJob) Prepare() error {
|
||||
return nil
|
||||
}
|
||||
|
||||
// JobArgs implements contracts.CronJob.
|
||||
func (CronJob) Args() []contracts.CronJobArg {
|
||||
return []contracts.CronJobArg{
|
||||
{
|
||||
Arg: SortArgs{
|
||||
Strings: []string{"a", "b", "c", "d"},
|
||||
},
|
||||
|
||||
PeriodicInterval: PeriodicInterval(time.Second * 10),
|
||||
RunOnStart: false,
|
||||
},
|
||||
}
|
||||
}
|
||||
47
app/jobs/demo_job.go
Normal file
47
app/jobs/demo_job.go
Normal file
@@ -0,0 +1,47 @@
|
||||
package jobs
|
||||
|
||||
import (
|
||||
"context"
|
||||
"sort"
|
||||
"time"
|
||||
|
||||
. "github.com/riverqueue/river"
|
||||
log "github.com/sirupsen/logrus"
|
||||
_ "go.ipao.vip/atom"
|
||||
"go.ipao.vip/atom/contracts"
|
||||
_ "go.ipao.vip/atom/contracts"
|
||||
)
|
||||
|
||||
var _ contracts.JobArgs = SortArgs{}
|
||||
|
||||
type SortArgs struct {
|
||||
Strings []string `json:"strings"`
|
||||
}
|
||||
|
||||
func (s SortArgs) InsertOpts() InsertOpts {
|
||||
return InsertOpts{
|
||||
Queue: QueueDefault,
|
||||
Priority: PriorityDefault,
|
||||
}
|
||||
}
|
||||
|
||||
func (SortArgs) Kind() string { return "sort" }
|
||||
func (a SortArgs) UniqueID() string { return a.Kind() }
|
||||
|
||||
var _ Worker[SortArgs] = (*SortWorker)(nil)
|
||||
|
||||
// @provider(job)
|
||||
type SortWorker struct {
|
||||
WorkerDefaults[SortArgs]
|
||||
}
|
||||
|
||||
func (w *SortWorker) Work(ctx context.Context, job *Job[SortArgs]) error {
|
||||
sort.Strings(job.Args.Strings)
|
||||
|
||||
log.Infof("[%s] Sorted strings: %v\n", time.Now().Format(time.TimeOnly), job.Args.Strings)
|
||||
return nil
|
||||
}
|
||||
|
||||
func (w *SortWorker) NextRetry(job *Job[SortArgs]) time.Time {
|
||||
return time.Now().Add(5 * time.Second)
|
||||
}
|
||||
41
app/jobs/provider.gen.go
Executable file
41
app/jobs/provider.gen.go
Executable file
@@ -0,0 +1,41 @@
|
||||
package jobs
|
||||
|
||||
import (
|
||||
"quyun/providers/job"
|
||||
|
||||
"github.com/riverqueue/river"
|
||||
"go.ipao.vip/atom"
|
||||
"go.ipao.vip/atom/container"
|
||||
"go.ipao.vip/atom/contracts"
|
||||
"go.ipao.vip/atom/opt"
|
||||
)
|
||||
|
||||
func Provide(opts ...opt.Option) error {
|
||||
if err := container.Container.Provide(func(
|
||||
__job *job.Job,
|
||||
) (contracts.Initial, error) {
|
||||
obj := &CronJob{}
|
||||
if err := obj.Prepare(); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
container.Later(func() error { return __job.AddPeriodicJobs(obj) })
|
||||
|
||||
return obj, nil
|
||||
}, atom.GroupInitial); err != nil {
|
||||
return err
|
||||
}
|
||||
if err := container.Container.Provide(func(
|
||||
__job *job.Job,
|
||||
) (contracts.Initial, error) {
|
||||
obj := &SortWorker{}
|
||||
if err := river.AddWorkerSafely(__job.Workers, obj); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
return obj, nil
|
||||
}, atom.GroupInitial); err != nil {
|
||||
return err
|
||||
}
|
||||
return nil
|
||||
}
|
||||
9
app/middlewares/mid_debug.go
Normal file
9
app/middlewares/mid_debug.go
Normal file
@@ -0,0 +1,9 @@
|
||||
package middlewares
|
||||
|
||||
import (
|
||||
"github.com/gofiber/fiber/v3"
|
||||
)
|
||||
|
||||
func (f *Middlewares) DebugMode(c fiber.Ctx) error {
|
||||
return c.Next()
|
||||
}
|
||||
15
app/middlewares/middlewares.go
Normal file
15
app/middlewares/middlewares.go
Normal file
@@ -0,0 +1,15 @@
|
||||
package middlewares
|
||||
|
||||
import (
|
||||
log "github.com/sirupsen/logrus"
|
||||
)
|
||||
|
||||
// @provider
|
||||
type Middlewares struct {
|
||||
log *log.Entry `inject:"false"`
|
||||
}
|
||||
|
||||
func (f *Middlewares) Prepare() error {
|
||||
f.log = log.WithField("module", "middleware")
|
||||
return nil
|
||||
}
|
||||
20
app/middlewares/provider.gen.go
Executable file
20
app/middlewares/provider.gen.go
Executable file
@@ -0,0 +1,20 @@
|
||||
package middlewares
|
||||
|
||||
import (
|
||||
"go.ipao.vip/atom/container"
|
||||
"go.ipao.vip/atom/opt"
|
||||
)
|
||||
|
||||
func Provide(opts ...opt.Option) error {
|
||||
if err := container.Container.Provide(func() (*Middlewares, error) {
|
||||
obj := &Middlewares{}
|
||||
if err := obj.Prepare(); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
return obj, nil
|
||||
}); err != nil {
|
||||
return err
|
||||
}
|
||||
return nil
|
||||
}
|
||||
86
app/models/medias.go
Normal file
86
app/models/medias.go
Normal file
@@ -0,0 +1,86 @@
|
||||
package models
|
||||
|
||||
import (
|
||||
"context"
|
||||
|
||||
"quyun/app/requests"
|
||||
"quyun/database/schemas/public/model"
|
||||
"quyun/database/schemas/public/table"
|
||||
|
||||
. "github.com/go-jet/jet/v2/postgres"
|
||||
"github.com/sirupsen/logrus"
|
||||
)
|
||||
|
||||
// @provider
|
||||
type mediasModel struct {
|
||||
log *logrus.Entry `inject:"false"`
|
||||
}
|
||||
|
||||
func (m *mediasModel) Prepare() error {
|
||||
m.log = logrus.WithField("module", "mediasModel")
|
||||
return nil
|
||||
}
|
||||
|
||||
// countByCond
|
||||
func (m *mediasModel) countByCondition(ctx context.Context, expr BoolExpression) (int64, error) {
|
||||
var cnt struct {
|
||||
Cnt int64
|
||||
}
|
||||
|
||||
tbl := table.Medias
|
||||
stmt := SELECT(COUNT(tbl.ID).AS("cnt")).FROM(tbl).WHERE(expr)
|
||||
m.log.Infof("sql: %s", stmt.DebugSql())
|
||||
|
||||
err := stmt.QueryContext(ctx, db, &cnt)
|
||||
if err != nil {
|
||||
m.log.Errorf("error counting media items: %v", err)
|
||||
return 0, err
|
||||
}
|
||||
|
||||
return cnt.Cnt, nil
|
||||
}
|
||||
|
||||
func (m *mediasModel) List(ctx context.Context, pagination *requests.Pagination) (*requests.Pager, error) {
|
||||
limit := pagination.Limit
|
||||
offset := pagination.Offset()
|
||||
|
||||
tbl := table.Medias
|
||||
stmt := tbl.
|
||||
SELECT(tbl.AllColumns).
|
||||
ORDER_BY(tbl.ID.DESC()).
|
||||
LIMIT(limit).
|
||||
OFFSET(offset)
|
||||
m.log.Infof("sql: %s", stmt.DebugSql())
|
||||
|
||||
var medias []model.Medias
|
||||
err := stmt.QueryContext(ctx, db, &medias)
|
||||
if err != nil {
|
||||
m.log.Errorf("error querying media items: %v", err)
|
||||
return nil, err
|
||||
}
|
||||
|
||||
count, err := m.countByCondition(ctx, Bool(true))
|
||||
if err != nil {
|
||||
m.log.Errorf("error getting media count: %v", err)
|
||||
return nil, err
|
||||
}
|
||||
|
||||
return &requests.Pager{
|
||||
Items: medias,
|
||||
Total: count,
|
||||
Pagination: *pagination,
|
||||
}, nil
|
||||
}
|
||||
|
||||
func (m *mediasModel) Create(ctx context.Context, model *model.Medias) error {
|
||||
stmt := table.Medias.INSERT(table.Medias.MutableColumns).MODEL(model)
|
||||
m.log.Infof("sql: %s", stmt.DebugSql())
|
||||
|
||||
if _, err := stmt.ExecContext(ctx, db); err != nil {
|
||||
m.log.Errorf("error creating media item: %v", err)
|
||||
return err
|
||||
}
|
||||
|
||||
m.log.Infof("media item created successfully")
|
||||
return nil
|
||||
}
|
||||
134
app/models/medias_test.go
Normal file
134
app/models/medias_test.go
Normal file
@@ -0,0 +1,134 @@
|
||||
package models
|
||||
|
||||
import (
|
||||
"context"
|
||||
"fmt"
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
"quyun/app/requests"
|
||||
"quyun/app/service/testx"
|
||||
"quyun/database"
|
||||
"quyun/database/schemas/public/model"
|
||||
"quyun/database/schemas/public/table"
|
||||
|
||||
. "github.com/smartystreets/goconvey/convey"
|
||||
"go.ipao.vip/atom/contracts"
|
||||
|
||||
// . "github.com/go-jet/jet/v2/postgres"
|
||||
"github.com/stretchr/testify/suite"
|
||||
"go.uber.org/dig"
|
||||
)
|
||||
|
||||
type MediasInjectParams struct {
|
||||
dig.In
|
||||
Initials []contracts.Initial `group:"initials"`
|
||||
}
|
||||
|
||||
type MediasTestSuite struct {
|
||||
suite.Suite
|
||||
|
||||
MediasInjectParams
|
||||
}
|
||||
|
||||
func Test_medias(t *testing.T) {
|
||||
providers := testx.Default().With(Provide)
|
||||
testx.Serve(providers, t, func(params MediasInjectParams) {
|
||||
suite.Run(t, &MediasTestSuite{MediasInjectParams: params})
|
||||
})
|
||||
}
|
||||
|
||||
func (s *MediasTestSuite) Test_countByCondition() {
|
||||
Convey("countByCondition", s.T(), func() {
|
||||
Convey("no cond", func() {
|
||||
database.Truncate(context.Background(), db, table.Medias.TableName())
|
||||
|
||||
cnt, err := Medias.countByCondition(context.Background(), nil)
|
||||
Convey("should not return an error", func() {
|
||||
So(err, ShouldBeNil)
|
||||
})
|
||||
Convey("should return a count of zero", func() {
|
||||
So(cnt, ShouldEqual, 0)
|
||||
})
|
||||
})
|
||||
})
|
||||
}
|
||||
|
||||
func (s *MediasTestSuite) Test_Create() {
|
||||
Convey("Create", s.T(), func() {
|
||||
Convey("valid media", func() {
|
||||
database.Truncate(context.Background(), db, table.Medias.TableName())
|
||||
|
||||
model := &model.Medias{
|
||||
Name: "test",
|
||||
CreatedAt: time.Now(),
|
||||
MimeType: "application/pdf",
|
||||
Size: 100,
|
||||
Path: "path/to/media.pdf",
|
||||
}
|
||||
|
||||
err := Medias.Create(context.Background(), model)
|
||||
Convey("Create should not return an error", func() {
|
||||
So(err, ShouldBeNil)
|
||||
})
|
||||
|
||||
cnt, err := Medias.countByCondition(context.Background(), nil)
|
||||
Convey("Count should not return an error", func() {
|
||||
So(err, ShouldBeNil)
|
||||
})
|
||||
Convey("should return a count of one", func() {
|
||||
So(cnt, ShouldEqual, 1)
|
||||
})
|
||||
Convey("should create the media successfully", func() {
|
||||
So(model.ID, ShouldNotBeEmpty)
|
||||
})
|
||||
})
|
||||
})
|
||||
}
|
||||
|
||||
func (s *MediasTestSuite) Test_Page() {
|
||||
Convey("Create", s.T(), func() {
|
||||
Convey("Insert Items", func() {
|
||||
database.Truncate(context.Background(), db, table.Medias.TableName())
|
||||
|
||||
for i := 0; i < 20; i++ {
|
||||
model := &model.Medias{
|
||||
Name: fmt.Sprintf("test-%d", i),
|
||||
CreatedAt: time.Now(),
|
||||
MimeType: "application/pdf",
|
||||
Size: 100,
|
||||
Path: "path/to/media.pdf",
|
||||
}
|
||||
|
||||
err := Medias.Create(context.Background(), model)
|
||||
So(err, ShouldBeNil)
|
||||
}
|
||||
|
||||
cnt, err := Medias.countByCondition(context.Background(), nil)
|
||||
So(err, ShouldBeNil)
|
||||
So(cnt, ShouldEqual, 20)
|
||||
})
|
||||
|
||||
Convey("Page", func() {
|
||||
Convey("page 1", func() {
|
||||
pager, err := Medias.List(context.Background(), &requests.Pagination{Page: 1, Limit: 10})
|
||||
So(err, ShouldBeNil)
|
||||
So(pager.Total, ShouldEqual, 20)
|
||||
So(pager.Items, ShouldHaveLength, 10)
|
||||
})
|
||||
Convey("page 2", func() {
|
||||
pager, err := Medias.List(context.Background(), &requests.Pagination{Page: 2, Limit: 10})
|
||||
So(err, ShouldBeNil)
|
||||
So(pager.Total, ShouldEqual, 20)
|
||||
So(pager.Items, ShouldHaveLength, 10)
|
||||
})
|
||||
|
||||
Convey("page 3", func() {
|
||||
pager, err := Medias.List(context.Background(), &requests.Pagination{Page: 3, Limit: 10})
|
||||
So(err, ShouldBeNil)
|
||||
So(pager.Total, ShouldEqual, 20)
|
||||
So(pager.Items, ShouldBeEmpty)
|
||||
})
|
||||
})
|
||||
})
|
||||
}
|
||||
8
app/models/migrations.go
Normal file
8
app/models/migrations.go
Normal file
@@ -0,0 +1,8 @@
|
||||
package models
|
||||
|
||||
// @provider
|
||||
type migrationsModel struct{}
|
||||
|
||||
func (m *migrationsModel) Prepare() error {
|
||||
return nil
|
||||
}
|
||||
26
app/models/models.gen.go
Normal file
26
app/models/models.gen.go
Normal file
@@ -0,0 +1,26 @@
|
||||
// Code generated by the atomctl ; DO NOT EDIT.
|
||||
// Code generated by the atomctl ; DO NOT EDIT.
|
||||
// Code generated by the atomctl ; DO NOT EDIT.
|
||||
package models
|
||||
|
||||
import (
|
||||
"database/sql"
|
||||
)
|
||||
|
||||
var db *sql.DB
|
||||
var Medias *mediasModel
|
||||
var Migrations *migrationsModel
|
||||
|
||||
// @provider(model)
|
||||
type models struct {
|
||||
db *sql.DB
|
||||
medias *mediasModel
|
||||
migrations *migrationsModel
|
||||
}
|
||||
|
||||
func (m *models) Prepare() error {
|
||||
db = m.db
|
||||
Medias = m.medias
|
||||
Migrations = m.migrations
|
||||
return nil
|
||||
}
|
||||
49
app/models/provider.gen.go
Executable file
49
app/models/provider.gen.go
Executable file
@@ -0,0 +1,49 @@
|
||||
package models
|
||||
|
||||
import (
|
||||
"database/sql"
|
||||
|
||||
"go.ipao.vip/atom"
|
||||
"go.ipao.vip/atom/container"
|
||||
"go.ipao.vip/atom/contracts"
|
||||
"go.ipao.vip/atom/opt"
|
||||
)
|
||||
|
||||
func Provide(opts ...opt.Option) error {
|
||||
if err := container.Container.Provide(func() (*mediasModel, error) {
|
||||
obj := &mediasModel{}
|
||||
if err := obj.Prepare(); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
return obj, nil
|
||||
}); err != nil {
|
||||
return err
|
||||
}
|
||||
if err := container.Container.Provide(func() (*migrationsModel, error) {
|
||||
obj := &migrationsModel{}
|
||||
|
||||
return obj, nil
|
||||
}); err != nil {
|
||||
return err
|
||||
}
|
||||
if err := container.Container.Provide(func(
|
||||
db *sql.DB,
|
||||
medias *mediasModel,
|
||||
migrations *migrationsModel,
|
||||
) (contracts.Initial, error) {
|
||||
obj := &models{
|
||||
db: db,
|
||||
medias: medias,
|
||||
migrations: migrations,
|
||||
}
|
||||
if err := obj.Prepare(); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
return obj, nil
|
||||
}, atom.GroupInitial); err != nil {
|
||||
return err
|
||||
}
|
||||
return nil
|
||||
}
|
||||
30
app/requests/pagination.go
Normal file
30
app/requests/pagination.go
Normal file
@@ -0,0 +1,30 @@
|
||||
package requests
|
||||
|
||||
import "github.com/samber/lo"
|
||||
|
||||
type Pager struct {
|
||||
Pagination `json:",inline"`
|
||||
Total int64 `json:"total"`
|
||||
Items any `json:"items"`
|
||||
}
|
||||
|
||||
type Pagination struct {
|
||||
Page int64 `json:"page" form:"page" query:"page"`
|
||||
Limit int64 `json:"limit" form:"limit" query:"limit"`
|
||||
}
|
||||
|
||||
func (filter *Pagination) Offset() int64 {
|
||||
return (filter.Page - 1) * filter.Limit
|
||||
}
|
||||
|
||||
func (filter *Pagination) Format() *Pagination {
|
||||
if filter.Page <= 0 {
|
||||
filter.Page = 1
|
||||
}
|
||||
|
||||
if !lo.Contains([]int64{10, 20, 50, 100}, filter.Limit) {
|
||||
filter.Limit = 10
|
||||
}
|
||||
|
||||
return filter
|
||||
}
|
||||
41
app/requests/sort.go
Normal file
41
app/requests/sort.go
Normal file
@@ -0,0 +1,41 @@
|
||||
package requests
|
||||
|
||||
import (
|
||||
"strings"
|
||||
|
||||
"github.com/samber/lo"
|
||||
)
|
||||
|
||||
type SortQueryFilter struct {
|
||||
Asc *string `json:"asc" form:"asc"`
|
||||
Desc *string `json:"desc" form:"desc"`
|
||||
}
|
||||
|
||||
func (s *SortQueryFilter) AscFields() []string {
|
||||
if s.Asc == nil {
|
||||
return nil
|
||||
}
|
||||
return strings.Split(*s.Asc, ",")
|
||||
}
|
||||
|
||||
func (s *SortQueryFilter) DescFields() []string {
|
||||
if s.Desc == nil {
|
||||
return nil
|
||||
}
|
||||
return strings.Split(*s.Desc, ",")
|
||||
}
|
||||
|
||||
func (s *SortQueryFilter) DescID() *SortQueryFilter {
|
||||
if s.Desc == nil {
|
||||
s.Desc = lo.ToPtr("id")
|
||||
}
|
||||
|
||||
items := s.DescFields()
|
||||
if lo.Contains(items, "id") {
|
||||
return s
|
||||
}
|
||||
|
||||
items = append(items, "id")
|
||||
s.Desc = lo.ToPtr(strings.Join(items, ","))
|
||||
return s
|
||||
}
|
||||
58
app/service/event/event.go
Normal file
58
app/service/event/event.go
Normal file
@@ -0,0 +1,58 @@
|
||||
package event
|
||||
|
||||
import (
|
||||
"context"
|
||||
|
||||
"go.ipao.vip/atom"
|
||||
"go.ipao.vip/atom/container"
|
||||
"go.ipao.vip/atom/contracts"
|
||||
"quyun/app/events/subscribers"
|
||||
"quyun/app/service"
|
||||
"quyun/providers/app"
|
||||
"quyun/providers/event"
|
||||
"quyun/providers/postgres"
|
||||
|
||||
log "github.com/sirupsen/logrus"
|
||||
"github.com/spf13/cobra"
|
||||
"go.uber.org/dig"
|
||||
)
|
||||
|
||||
func defaultProviders() container.Providers {
|
||||
return service.Default(container.Providers{
|
||||
postgres.DefaultProvider(),
|
||||
}...)
|
||||
}
|
||||
|
||||
func Command() atom.Option {
|
||||
return atom.Command(
|
||||
atom.Name("event"),
|
||||
atom.Short("start event processor"),
|
||||
atom.RunE(Serve),
|
||||
atom.Providers(
|
||||
defaultProviders().
|
||||
With(
|
||||
subscribers.Provide,
|
||||
),
|
||||
),
|
||||
)
|
||||
}
|
||||
|
||||
type Service struct {
|
||||
dig.In
|
||||
|
||||
App *app.Config
|
||||
PubSub *event.PubSub
|
||||
Initials []contracts.Initial `group:"initials"`
|
||||
}
|
||||
|
||||
func Serve(cmd *cobra.Command, args []string) error {
|
||||
return container.Container.Invoke(func(ctx context.Context, svc Service) error {
|
||||
log.SetFormatter(&log.JSONFormatter{})
|
||||
|
||||
if svc.App.IsDevMode() {
|
||||
log.SetLevel(log.DebugLevel)
|
||||
}
|
||||
|
||||
return svc.PubSub.Serve(ctx)
|
||||
})
|
||||
}
|
||||
57
app/service/grpc/grpc.go
Normal file
57
app/service/grpc/grpc.go
Normal file
@@ -0,0 +1,57 @@
|
||||
package grpc
|
||||
|
||||
import (
|
||||
"go.ipao.vip/atom"
|
||||
"go.ipao.vip/atom/container"
|
||||
"go.ipao.vip/atom/contracts"
|
||||
"quyun/app/grpc/users"
|
||||
"quyun/app/service"
|
||||
"quyun/providers/app"
|
||||
"quyun/providers/grpc"
|
||||
"quyun/providers/postgres"
|
||||
|
||||
log "github.com/sirupsen/logrus"
|
||||
"github.com/spf13/cobra"
|
||||
"go.uber.org/dig"
|
||||
)
|
||||
|
||||
func defaultProviders() container.Providers {
|
||||
return service.Default(container.Providers{
|
||||
postgres.DefaultProvider(),
|
||||
grpc.DefaultProvider(),
|
||||
}...)
|
||||
}
|
||||
|
||||
func Command() atom.Option {
|
||||
return atom.Command(
|
||||
atom.Name("grpc"),
|
||||
atom.Short("run grpc server"),
|
||||
atom.RunE(Serve),
|
||||
atom.Providers(
|
||||
defaultProviders().
|
||||
With(
|
||||
users.Provide,
|
||||
),
|
||||
),
|
||||
)
|
||||
}
|
||||
|
||||
type Service struct {
|
||||
dig.In
|
||||
|
||||
App *app.Config
|
||||
Grpc *grpc.Grpc
|
||||
Initials []contracts.Initial `group:"initials"`
|
||||
}
|
||||
|
||||
func Serve(cmd *cobra.Command, args []string) error {
|
||||
return container.Container.Invoke(func(svc Service) error {
|
||||
log.SetFormatter(&log.JSONFormatter{})
|
||||
|
||||
if svc.App.IsDevMode() {
|
||||
log.SetLevel(log.DebugLevel)
|
||||
}
|
||||
|
||||
return svc.Grpc.Serve()
|
||||
})
|
||||
}
|
||||
86
app/service/http/http.go
Normal file
86
app/service/http/http.go
Normal file
@@ -0,0 +1,86 @@
|
||||
package http
|
||||
|
||||
import (
|
||||
"context"
|
||||
|
||||
"quyun/app/errorx"
|
||||
appHttp "quyun/app/http"
|
||||
"quyun/app/jobs"
|
||||
"quyun/app/service"
|
||||
_ "quyun/docs"
|
||||
"quyun/providers/app"
|
||||
"quyun/providers/hashids"
|
||||
"quyun/providers/http"
|
||||
"quyun/providers/http/swagger"
|
||||
"quyun/providers/job"
|
||||
"quyun/providers/jwt"
|
||||
"quyun/providers/postgres"
|
||||
|
||||
"go.ipao.vip/atom"
|
||||
"go.ipao.vip/atom/container"
|
||||
"go.ipao.vip/atom/contracts"
|
||||
|
||||
"github.com/gofiber/fiber/v3/middleware/favicon"
|
||||
log "github.com/sirupsen/logrus"
|
||||
"github.com/spf13/cobra"
|
||||
"go.uber.org/dig"
|
||||
)
|
||||
|
||||
func defaultProviders() container.Providers {
|
||||
return service.Default(container.Providers{
|
||||
http.DefaultProvider(),
|
||||
postgres.DefaultProvider(),
|
||||
jwt.DefaultProvider(),
|
||||
hashids.DefaultProvider(),
|
||||
job.DefaultProvider(),
|
||||
}...)
|
||||
}
|
||||
|
||||
func Command() atom.Option {
|
||||
return atom.Command(
|
||||
atom.Name("serve"),
|
||||
atom.Short("run http server"),
|
||||
atom.RunE(Serve),
|
||||
atom.Providers(
|
||||
defaultProviders().
|
||||
With(
|
||||
jobs.Provide,
|
||||
appHttp.Provide,
|
||||
),
|
||||
),
|
||||
)
|
||||
}
|
||||
|
||||
type Service struct {
|
||||
dig.In
|
||||
|
||||
Initials []contracts.Initial `group:"initials"`
|
||||
|
||||
App *app.Config
|
||||
Job *job.Job
|
||||
Http *http.Service
|
||||
Routes []contracts.HttpRoute `group:"routes"`
|
||||
}
|
||||
|
||||
func Serve(cmd *cobra.Command, args []string) error {
|
||||
return container.Container.Invoke(func(ctx context.Context, svc Service) error {
|
||||
log.SetFormatter(&log.JSONFormatter{})
|
||||
|
||||
if svc.App.Mode == app.AppModeDevelopment {
|
||||
log.SetLevel(log.DebugLevel)
|
||||
|
||||
svc.Http.Engine.Get("/swagger/*", swagger.HandlerDefault)
|
||||
}
|
||||
svc.Http.Engine.Use(errorx.Middleware)
|
||||
svc.Http.Engine.Use(favicon.New(favicon.Config{
|
||||
Data: []byte{},
|
||||
}))
|
||||
|
||||
group := svc.Http.Engine.Group("")
|
||||
for _, route := range svc.Routes {
|
||||
route.Register(group)
|
||||
}
|
||||
|
||||
return svc.Http.Serve()
|
||||
})
|
||||
}
|
||||
60
app/service/migrate/migrate.go
Normal file
60
app/service/migrate/migrate.go
Normal file
@@ -0,0 +1,60 @@
|
||||
package migrate
|
||||
|
||||
import (
|
||||
"context"
|
||||
"database/sql"
|
||||
|
||||
"quyun/app/service"
|
||||
"quyun/database"
|
||||
"quyun/providers/postgres"
|
||||
|
||||
"github.com/pressly/goose/v3"
|
||||
log "github.com/sirupsen/logrus"
|
||||
"github.com/spf13/cobra"
|
||||
"go.ipao.vip/atom"
|
||||
"go.ipao.vip/atom/container"
|
||||
"go.uber.org/dig"
|
||||
)
|
||||
|
||||
func defaultProviders() container.Providers {
|
||||
return service.Default(container.Providers{
|
||||
postgres.DefaultProvider(),
|
||||
}...)
|
||||
}
|
||||
|
||||
func Command() atom.Option {
|
||||
return atom.Command(
|
||||
atom.Name("migrate"),
|
||||
atom.Short("run migrations"),
|
||||
atom.RunE(Serve),
|
||||
atom.Providers(defaultProviders()),
|
||||
atom.Example("migrate [up|up-by-one|up-to|create|down|down-to|fix|redo|reset|status|version]"),
|
||||
)
|
||||
}
|
||||
|
||||
type Service struct {
|
||||
dig.In
|
||||
|
||||
DB *sql.DB
|
||||
}
|
||||
|
||||
// migrate
|
||||
func Serve(cmd *cobra.Command, args []string) error {
|
||||
return container.Container.Invoke(func(ctx context.Context, svc Service) error {
|
||||
if len(args) == 0 {
|
||||
args = append(args, "up")
|
||||
}
|
||||
|
||||
if args[0] == "create" {
|
||||
return nil
|
||||
}
|
||||
|
||||
action, args := args[0], args[1:]
|
||||
log.Infof("migration action: %s args: %+v", action, args)
|
||||
|
||||
goose.SetBaseFS(database.MigrationFS)
|
||||
goose.SetTableName("migrations")
|
||||
|
||||
return goose.RunContext(context.Background(), action, svc.DB, "migrations", args...)
|
||||
})
|
||||
}
|
||||
24
app/service/queue/error.go
Normal file
24
app/service/queue/error.go
Normal file
@@ -0,0 +1,24 @@
|
||||
package queue
|
||||
|
||||
import (
|
||||
"context"
|
||||
|
||||
"github.com/riverqueue/river"
|
||||
"github.com/riverqueue/river/rivertype"
|
||||
log "github.com/sirupsen/logrus"
|
||||
)
|
||||
|
||||
type CustomErrorHandler struct{}
|
||||
|
||||
func (*CustomErrorHandler) HandleError(ctx context.Context, job *rivertype.JobRow, err error) *river.ErrorHandlerResult {
|
||||
log.Infof("Job errored with: %s\n", err)
|
||||
return nil
|
||||
}
|
||||
|
||||
func (*CustomErrorHandler) HandlePanic(ctx context.Context, job *rivertype.JobRow, panicVal any, trace string) *river.ErrorHandlerResult {
|
||||
log.Infof("Job panicked with: %v\n", panicVal)
|
||||
log.Infof("Stack trace: %s\n", trace)
|
||||
return &river.ErrorHandlerResult{
|
||||
SetCancelled: true,
|
||||
}
|
||||
}
|
||||
66
app/service/queue/river.go
Normal file
66
app/service/queue/river.go
Normal file
@@ -0,0 +1,66 @@
|
||||
package queue
|
||||
|
||||
import (
|
||||
"context"
|
||||
|
||||
"go.ipao.vip/atom"
|
||||
"go.ipao.vip/atom/container"
|
||||
"go.ipao.vip/atom/contracts"
|
||||
"quyun/app/jobs"
|
||||
"quyun/app/service"
|
||||
"quyun/providers/app"
|
||||
"quyun/providers/job"
|
||||
"quyun/providers/postgres"
|
||||
|
||||
log "github.com/sirupsen/logrus"
|
||||
"github.com/spf13/cobra"
|
||||
"go.uber.org/dig"
|
||||
)
|
||||
|
||||
func defaultProviders() container.Providers {
|
||||
return service.Default(container.Providers{
|
||||
postgres.DefaultProvider(),
|
||||
job.DefaultProvider(),
|
||||
}...)
|
||||
}
|
||||
|
||||
func Command() atom.Option {
|
||||
return atom.Command(
|
||||
atom.Name("queue"),
|
||||
atom.Short("start queue processor"),
|
||||
atom.RunE(Serve),
|
||||
atom.Providers(
|
||||
defaultProviders().
|
||||
With(
|
||||
jobs.Provide,
|
||||
),
|
||||
),
|
||||
)
|
||||
}
|
||||
|
||||
type Service struct {
|
||||
dig.In
|
||||
|
||||
App *app.Config
|
||||
Job *job.Job
|
||||
Initials []contracts.Initial `group:"initials"`
|
||||
CronJobs []contracts.CronJob `group:"cron_jobs"`
|
||||
}
|
||||
|
||||
func Serve(cmd *cobra.Command, args []string) error {
|
||||
return container.Container.Invoke(func(ctx context.Context, svc Service) error {
|
||||
log.SetFormatter(&log.JSONFormatter{})
|
||||
|
||||
if svc.App.IsDevMode() {
|
||||
log.SetLevel(log.DebugLevel)
|
||||
}
|
||||
|
||||
if err := svc.Job.Start(ctx); err != nil {
|
||||
return err
|
||||
}
|
||||
defer svc.Job.Close()
|
||||
|
||||
<-ctx.Done()
|
||||
return nil
|
||||
})
|
||||
}
|
||||
14
app/service/service.go
Normal file
14
app/service/service.go
Normal file
@@ -0,0 +1,14 @@
|
||||
package service
|
||||
|
||||
import (
|
||||
"go.ipao.vip/atom/container"
|
||||
"quyun/providers/app"
|
||||
"quyun/providers/event"
|
||||
)
|
||||
|
||||
func Default(providers ...container.ProviderContainer) container.Providers {
|
||||
return append(container.Providers{
|
||||
app.DefaultProvider(),
|
||||
event.DefaultProvider(),
|
||||
}, providers...)
|
||||
}
|
||||
36
app/service/testx/testing.go
Normal file
36
app/service/testx/testing.go
Normal file
@@ -0,0 +1,36 @@
|
||||
package testx
|
||||
|
||||
import (
|
||||
"os"
|
||||
"testing"
|
||||
|
||||
"quyun/providers/app"
|
||||
"quyun/providers/postgres"
|
||||
|
||||
"go.ipao.vip/atom"
|
||||
"go.ipao.vip/atom/container"
|
||||
|
||||
"github.com/rogeecn/fabfile"
|
||||
. "github.com/smartystreets/goconvey/convey"
|
||||
)
|
||||
|
||||
func Default(providers ...container.ProviderContainer) container.Providers {
|
||||
return append(container.Providers{
|
||||
app.DefaultProvider(),
|
||||
postgres.DefaultProvider(),
|
||||
}, providers...)
|
||||
}
|
||||
|
||||
func Serve(providers container.Providers, t *testing.T, invoke any) {
|
||||
Convey("tests boot up", t, func() {
|
||||
file := fabfile.MustFind("config.toml")
|
||||
|
||||
localEnv := os.Getenv("ENV_LOCAL")
|
||||
if localEnv != "" {
|
||||
file = fabfile.MustFind("config." + localEnv + ".toml")
|
||||
}
|
||||
|
||||
So(atom.LoadProviders(file, providers), ShouldBeNil)
|
||||
So(container.Container.Invoke(invoke), ShouldBeNil)
|
||||
})
|
||||
}
|
||||
Reference in New Issue
Block a user