diff --git a/pkg/ast/provider/provider.go b/pkg/ast/provider/provider.go index 2271c2f..1740438 100644 --- a/pkg/ast/provider/provider.go +++ b/pkg/ast/provider/provider.go @@ -163,14 +163,33 @@ func Parse(source string) []Provider { provider.ReturnType = "*" + provider.StructName } + if providerDoc.Mode == "event" { + provider.Mode = "event" + + modePkg := gomod.GetModuleName() + "/providers/events" + + provider.Imports["git.ipao.vip/rogeecn/atom/contracts"] = "" + provider.Imports[modePkg] = "" + + provider.ProviderGroup = "atom.GroupInitial" + provider.ReturnType = "contracts.Initial" + + provider.InjectParams["__event"] = InjectParam{ + Star: "*", + Type: "PubSub", + Package: modePkg, + PackageAlias: "events", + } + } + if providerDoc.Mode == "job" { provider.Mode = "job" - jobPkg := gomod.GetModuleName() + "/providers/job" + modePkg := gomod.GetModuleName() + "/providers/job" provider.Imports["git.ipao.vip/rogeecn/atom/contracts"] = "" provider.Imports["github.com/riverqueue/river"] = "" - provider.Imports[jobPkg] = "" + provider.Imports[modePkg] = "" provider.ProviderGroup = "atom.GroupInitial" provider.ReturnType = "contracts.Initial" @@ -178,7 +197,7 @@ func Parse(source string) []Provider { provider.InjectParams["__job"] = InjectParam{ Star: "*", Type: "Job", - Package: jobPkg, + Package: modePkg, PackageAlias: "job", } } diff --git a/pkg/ast/provider/provider.go.tpl b/pkg/ast/provider/provider.go.tpl index d82263e..fa81884 100644 --- a/pkg/ast/provider/provider.go.tpl +++ b/pkg/ast/provider/provider.go.tpl @@ -19,7 +19,7 @@ func Provide(opts ...opt.Option) error { ) ({{.ReturnType}}, error) { obj := &{{.StructName}}{ {{- range $key, $param := .InjectParams }} - {{- if ne $key "__job"}} + {{- if and (ne $key "__job") (ne $key "__event")}} {{$key}}: {{$key}}, {{- end}} {{- end }} @@ -30,6 +30,10 @@ func Provide(opts ...opt.Option) error { } {{- end }} + {{- if eq .Mode "event"}} + __event.Handle("handler:{{.StructName}}", obj.Topic(), obj.PublishToTopic(), obj.Handler) + {{- end }} + {{- if eq .Mode "job"}} if err := river.AddWorkerSafely(__job.Workers, obj); err != nil { return nil, err diff --git a/templates/project/config.toml.tpl b/templates/project/config.toml.tpl index e3861f7..3032ce9 100755 --- a/templates/project/config.toml.tpl +++ b/templates/project/config.toml.tpl @@ -5,14 +5,6 @@ BaseURI = "baseURI" [Http] Port = 8080 -[Swagger] -BaseRoute = "doc" -Title = "Api" -Description = "Api Docs" -BasePath = "/v1" -Version = "1.0.0" - - [Database] Host = "10.1.1.1" Database = "postgres" @@ -25,3 +17,9 @@ SigningKey = "Key" [HashIDs] Salt = "Salt" + +[Redis] +Host = "" +Port = 6379 +Password = "hello" +DB = 0 \ No newline at end of file diff --git a/templates/project/pkg/service/event/event.go.tpl b/templates/project/pkg/service/event/event.go.tpl new file mode 100644 index 0000000..50df8d0 --- /dev/null +++ b/templates/project/pkg/service/event/event.go.tpl @@ -0,0 +1,58 @@ +package event + +import ( + "context" + "qq/app/events" + "qq/pkg/service" + "qq/providers/app" + "qq/providers/postgres" + + providerEvents "qq/providers/events" + + "git.ipao.vip/rogeecn/atom" + "git.ipao.vip/rogeecn/atom/container" + "git.ipao.vip/rogeecn/atom/contracts" + log "github.com/sirupsen/logrus" + "github.com/spf13/cobra" + "go.uber.org/dig" +) + +func defaultProviders() container.Providers { + return service.Default(container.Providers{ + postgres.DefaultProvider(), + }...) +} + +func Command() atom.Option { + return atom.Command( + atom.Name("event"), + atom.Short("start event processor"), + atom.RunE(Serve), + atom.Providers( + defaultProviders(). + With( + events.Provide, + ), + ), + ) +} + +type Service struct { + dig.In + + App *app.Config + PubSub *providerEvents.PubSub + Initials []contracts.Initial `group:"initials"` +} + +func Serve(cmd *cobra.Command, args []string) error { + return container.Container.Invoke(func(ctx context.Context, svc Service) error { + log.SetFormatter(&log.JSONFormatter{}) + + if svc.App.IsDevMode() { + log.SetLevel(log.DebugLevel) + } + + return svc.PubSub.Serve(ctx) + }) +} diff --git a/templates/project/pkg/service/service.go.tpl b/templates/project/pkg/service/service.go.tpl index b44a874..2829ec2 100644 --- a/templates/project/pkg/service/service.go.tpl +++ b/templates/project/pkg/service/service.go.tpl @@ -2,6 +2,7 @@ package service import ( "{{.ModuleName}}/providers/app" + "{{.ModuleName}}/providers/events" "git.ipao.vip/rogeecn/atom/container" ) @@ -9,5 +10,6 @@ import ( func Default(providers ...container.ProviderContainer) container.Providers { return append(container.Providers{ app.DefaultProvider(), + events.DefaultProvider(), }, providers...) } diff --git a/templates/project/providers/events/config.go.tpl b/templates/project/providers/events/config.go.tpl new file mode 100644 index 0000000..69c79a2 --- /dev/null +++ b/templates/project/providers/events/config.go.tpl @@ -0,0 +1,48 @@ +package events + +import ( + "context" + + "git.ipao.vip/rogeecn/atom/container" + "git.ipao.vip/rogeecn/atom/utils/opt" + "github.com/ThreeDotsLabs/watermill/message" +) + +const DefaultPrefix = "Events" + +func DefaultProvider() container.ProviderContainer { + return container.ProviderContainer{ + Provider: Provide, + Options: []opt.Option{ + opt.Prefix(DefaultPrefix), + }, + } +} + +type Config struct { + ConsumerGroup string + + Brokers []string +} + +type PubSub struct { + Publisher message.Publisher + Subscriber message.Subscriber + Router *message.Router +} + +func (ps *PubSub) Serve(ctx context.Context) error { + if err := ps.Router.Run(ctx); err != nil { + return err + } + return nil +} + +func (ps *PubSub) Handle( + handlerName string, + consumerTopic string, + publisherTopic string, + handler message.HandlerFunc, +) { + ps.Router.AddHandler(handlerName, consumerTopic, ps.Subscriber, publisherTopic, ps.Publisher, handler) +} diff --git a/templates/project/providers/events/logrus_adapter.go.tpl b/templates/project/providers/events/logrus_adapter.go.tpl new file mode 100644 index 0000000..a47ab64 --- /dev/null +++ b/templates/project/providers/events/logrus_adapter.go.tpl @@ -0,0 +1,60 @@ +package events + +import ( + "github.com/ThreeDotsLabs/watermill" + "github.com/sirupsen/logrus" +) + +// LogrusLoggerAdapter is a watermill logger adapter for logrus. +type LogrusLoggerAdapter struct { + log *logrus.Logger + fields watermill.LogFields +} + +// NewLogrusLogger returns a LogrusLoggerAdapter that sends all logs to +// the passed logrus instance. +func LogrusAdapter() watermill.LoggerAdapter { + return &LogrusLoggerAdapter{log: logrus.StandardLogger()} +} + +// Error logs on level error with err as field and optional fields. +func (l *LogrusLoggerAdapter) Error(msg string, err error, fields watermill.LogFields) { + l.createEntry(fields.Add(watermill.LogFields{"err": err})).Error(msg) +} + +// Info logs on level info with optional fields. +func (l *LogrusLoggerAdapter) Info(msg string, fields watermill.LogFields) { + l.createEntry(fields).Info(msg) +} + +// Debug logs on level debug with optional fields. +func (l *LogrusLoggerAdapter) Debug(msg string, fields watermill.LogFields) { + l.createEntry(fields).Debug(msg) +} + +// Trace logs on level trace with optional fields. +func (l *LogrusLoggerAdapter) Trace(msg string, fields watermill.LogFields) { + l.createEntry(fields).Trace(msg) +} + +// With returns a new LogrusLoggerAdapter that includes fields +// to be re-used between logging statements. +func (l *LogrusLoggerAdapter) With(fields watermill.LogFields) watermill.LoggerAdapter { + return &LogrusLoggerAdapter{ + log: l.log, + fields: l.fields.Add(fields), + } +} + +// createEntry is a helper to add fields to a logrus entry if necessary. +func (l *LogrusLoggerAdapter) createEntry(fields watermill.LogFields) *logrus.Entry { + entry := logrus.NewEntry(l.log) + + allFields := fields.Add(l.fields) + + if len(allFields) > 0 { + entry = entry.WithFields(logrus.Fields(allFields)) + } + + return entry +} diff --git a/templates/project/providers/events/provider.go.tpl b/templates/project/providers/events/provider.go.tpl new file mode 100644 index 0000000..c228926 --- /dev/null +++ b/templates/project/providers/events/provider.go.tpl @@ -0,0 +1,32 @@ +package events + +import ( + "git.ipao.vip/rogeecn/atom/container" + "git.ipao.vip/rogeecn/atom/utils/opt" + "github.com/ThreeDotsLabs/watermill/message" + "github.com/ThreeDotsLabs/watermill/pubsub/gochannel" +) + +func Provide(opts ...opt.Option) error { + o := opt.New(opts...) + var config Config + if err := o.UnmarshalConfig(&config); err != nil { + return err + } + + return container.Container.Provide(func() (*PubSub, error) { + logger := LogrusAdapter() + + client := gochannel.NewGoChannel(gochannel.Config{}, logger) + router, err := message.NewRouter(message.RouterConfig{}, logger) + if err != nil { + return nil, err + } + + return &PubSub{ + Publisher: client, + Subscriber: client, + Router: router, + }, nil + }, o.DiOptions()...) +} diff --git a/templates/project/providers/events/provider_kafka.go.tpl b/templates/project/providers/events/provider_kafka.go.tpl new file mode 100644 index 0000000..6f58f97 --- /dev/null +++ b/templates/project/providers/events/provider_kafka.go.tpl @@ -0,0 +1,48 @@ +package events + +import ( + "git.ipao.vip/rogeecn/atom/container" + "git.ipao.vip/rogeecn/atom/utils/opt" + "github.com/ThreeDotsLabs/watermill-kafka/v3/pkg/kafka" + "github.com/ThreeDotsLabs/watermill/message" +) + +func ProvideKafka(opts ...opt.Option) error { + o := opt.New(opts...) + var config Config + if err := o.UnmarshalConfig(&config); err != nil { + return err + } + + return container.Container.Provide(func() (*PubSub, error) { + logger := LogrusAdapter() + + publisher, err := kafka.NewPublisher(kafka.PublisherConfig{ + Brokers: config.Brokers, + Marshaler: kafka.DefaultMarshaler{}, + }, logger) + if err != nil { + return nil, err + } + + subscriber, err := kafka.NewSubscriber(kafka.SubscriberConfig{ + Brokers: config.Brokers, + Unmarshaler: kafka.DefaultMarshaler{}, + ConsumerGroup: config.ConsumerGroup, + }, logger) + if err != nil { + return nil, err + } + + router, err := message.NewRouter(message.RouterConfig{}, logger) + if err != nil { + return nil, err + } + + return &PubSub{ + Publisher: publisher, + Subscriber: subscriber, + Router: router, + }, nil + }, o.DiOptions()...) +} diff --git a/templates/project/providers/events/provider_redis.go.tpl b/templates/project/providers/events/provider_redis.go.tpl new file mode 100644 index 0000000..0d48bb4 --- /dev/null +++ b/templates/project/providers/events/provider_redis.go.tpl @@ -0,0 +1,49 @@ +package events + +import ( + "git.ipao.vip/rogeecn/atom/container" + "git.ipao.vip/rogeecn/atom/utils/opt" + "github.com/ThreeDotsLabs/watermill-redisstream/pkg/redisstream" + "github.com/ThreeDotsLabs/watermill/message" + "github.com/redis/go-redis/v9" +) + +func ProvideRedis(opts ...opt.Option) error { + o := opt.New(opts...) + var config Config + if err := o.UnmarshalConfig(&config); err != nil { + return err + } + + return container.Container.Provide(func(rdb redis.UniversalClient) (*PubSub, error) { + logger := LogrusAdapter() + + subscriber, err := redisstream.NewSubscriber(redisstream.SubscriberConfig{ + Client: rdb, + Unmarshaller: redisstream.DefaultMarshallerUnmarshaller{}, + ConsumerGroup: config.ConsumerGroup, + }, logger) + if err != nil { + return nil, err + } + + publisher, err := redisstream.NewPublisher(redisstream.PublisherConfig{ + Client: rdb, + Marshaller: redisstream.DefaultMarshallerUnmarshaller{}, + }, logger) + if err != nil { + return nil, err + } + + router, err := message.NewRouter(message.RouterConfig{}, logger) + if err != nil { + return nil, err + } + + return &PubSub{ + Publisher: publisher, + Subscriber: subscriber, + Router: router, + }, nil + }, o.DiOptions()...) +} diff --git a/templates/project/providers/events/provider_sql.go.tpl b/templates/project/providers/events/provider_sql.go.tpl new file mode 100644 index 0000000..b862313 --- /dev/null +++ b/templates/project/providers/events/provider_sql.go.tpl @@ -0,0 +1,49 @@ +package events + +import ( + sqlDB "database/sql" + + "git.ipao.vip/rogeecn/atom/container" + "git.ipao.vip/rogeecn/atom/utils/opt" + "github.com/ThreeDotsLabs/watermill-sql/v3/pkg/sql" + "github.com/ThreeDotsLabs/watermill/message" +) + +func ProvideSQL(opts ...opt.Option) error { + o := opt.New(opts...) + var config Config + if err := o.UnmarshalConfig(&config); err != nil { + return err + } + + return container.Container.Provide(func(db *sqlDB.DB) (*PubSub, error) { + logger := LogrusAdapter() + + publisher, err := sql.NewPublisher(db, sql.PublisherConfig{ + SchemaAdapter: sql.DefaultPostgreSQLSchema{}, + AutoInitializeSchema: false, + }, logger) + if err != nil { + return nil, err + } + + subscriber, err := sql.NewSubscriber(db, sql.SubscriberConfig{ + SchemaAdapter: sql.DefaultPostgreSQLSchema{}, + ConsumerGroup: config.ConsumerGroup, + }, logger) + if err != nil { + return nil, err + } + + router, err := message.NewRouter(message.RouterConfig{}, logger) + if err != nil { + return nil, err + } + + return &PubSub{ + Publisher: publisher, + Subscriber: subscriber, + Router: router, + }, nil + }, o.DiOptions()...) +} diff --git a/templates/project/providers/redis/config.go.tpl b/templates/project/providers/redis/config.go.tpl new file mode 100644 index 0000000..bf5eb45 --- /dev/null +++ b/templates/project/providers/redis/config.go.tpl @@ -0,0 +1,44 @@ +package redis + +import ( + "fmt" + + "git.ipao.vip/rogeecn/atom/container" + "git.ipao.vip/rogeecn/atom/utils/opt" +) + +const DefaultPrefix = "Redis" + +func DefaultProvider() container.ProviderContainer { + return container.ProviderContainer{ + Provider: Provide, + Options: []opt.Option{ + opt.Prefix(DefaultPrefix), + }, + } +} + +type Config struct { + Host string + Port uint + Password string + DB uint +} + +func (c *Config) format() { + if c.Host == "" { + c.Host = "localhost" + } + + if c.Port == 0 { + c.Port = 6379 + } + + if c.DB == 0 { + c.DB = 0 + } +} + +func (c *Config) Addr() string { + return fmt.Sprintf("%s:%d", c.Host, c.Port) +} diff --git a/templates/project/providers/redis/provider.go.tpl b/templates/project/providers/redis/provider.go.tpl new file mode 100644 index 0000000..fc3ba22 --- /dev/null +++ b/templates/project/providers/redis/provider.go.tpl @@ -0,0 +1,33 @@ +package redis + +import ( + "context" + "time" + + "git.ipao.vip/rogeecn/atom/container" + "git.ipao.vip/rogeecn/atom/utils/opt" + "github.com/redis/go-redis/v9" +) + +func Provide(opts ...opt.Option) error { + o := opt.New(opts...) + var config Config + if err := o.UnmarshalConfig(&config); err != nil { + return err + } + config.format() + return container.Container.Provide(func() (redis.UniversalClient, error) { + rdb := redis.NewClient(&redis.Options{ + Addr: config.Addr(), + Password: config.Password, + DB: int(config.DB), + }) + + ctx, _ := context.WithTimeout(context.Background(), 5*time.Second) + if _, err := rdb.Ping(ctx).Result(); err != nil { + return nil, err + } + + return rdb, nil + }, o.DiOptions()...) +}