diff --git a/pkg/ast/provider/provider.go.tpl b/pkg/ast/provider/provider.go.tpl index 4f39a18..f1a2b50 100644 --- a/pkg/ast/provider/provider.go.tpl +++ b/pkg/ast/provider/provider.go.tpl @@ -35,7 +35,7 @@ func Provide(opts ...opt.Option) error { {{- end }} {{- if eq .Mode "event"}} - __event.Handle("handler:{{.StructName}}", obj.Topic(), obj.PublishToTopic(), obj.Handler) + __event.Handle("handler:{{.StructName}}", obj) {{- end }} {{- if eq .Mode "job"}} diff --git a/templates/events/publisher.go.tpl b/templates/events/publisher.go.tpl index 7aa0b83..2946426 100644 --- a/templates/events/publisher.go.tpl +++ b/templates/events/publisher.go.tpl @@ -4,6 +4,7 @@ import ( "encoding/json" "{{.ModuleName}}/app/events" + "{{.ModuleName}}/providers/event" "go.ipao.vip/atom/contracts" ) @@ -11,6 +12,8 @@ import ( var _ contracts.EventPublisher = (*{{.Name}}Event)(nil) type {{.Name}}Event struct { + event.DefaultChannel + ID int64 `json:"id"` } diff --git a/templates/events/subscriber.go.tpl b/templates/events/subscriber.go.tpl index 6ae98ac..7ce14c1 100644 --- a/templates/events/subscriber.go.tpl +++ b/templates/events/subscriber.go.tpl @@ -5,6 +5,7 @@ import ( "{{.ModuleName}}/app/events" "{{.ModuleName}}/app/events/publishers" + "{{.ModuleName}}/providers/event" "go.ipao.vip/atom/contracts" "github.com/ThreeDotsLabs/watermill/message" @@ -15,6 +16,9 @@ var _ contracts.EventHandler = (*{{.Name}}Subscriber)(nil) // @provider(event) type {{.Name}}Subscriber struct { + event.DefaultChannel + event.DefaultPublishTo + log *logrus.Entry `inject:"false"` } @@ -23,11 +27,6 @@ func (e *{{.Name}}Subscriber) Prepare() error { return nil } -// PublishToTopic implements contracts.EventHandler. -func (e *{{.Name}}Subscriber) PublishToTopic() string { - return events.TopicProcessed -} - // Topic implements contracts.EventHandler. func (e *{{.Name}}Subscriber) Topic() string { return events.Topic{{.Name}} diff --git a/templates/project/app/events/publishers/user_register.go.tpl b/templates/project/app/events/publishers/user_register.go.tpl index 5f998a2..8cb5455 100644 --- a/templates/project/app/events/publishers/user_register.go.tpl +++ b/templates/project/app/events/publishers/user_register.go.tpl @@ -3,13 +3,17 @@ package publishers import ( "encoding/json" - "go.ipao.vip/atom/contracts" "{{.ModuleName}}/app/events" + "{{.ModuleName}}/providers/event" + + "go.ipao.vip/atom/contracts" ) var _ contracts.EventPublisher = (*UserRegister)(nil) type UserRegister struct { + event.DefaultChannel + ID int64 `json:"id"` } diff --git a/templates/project/app/events/subscribers/user_register.go.tpl b/templates/project/app/events/subscribers/user_register.go.tpl index 2ab7683..32eac39 100644 --- a/templates/project/app/events/subscribers/user_register.go.tpl +++ b/templates/project/app/events/subscribers/user_register.go.tpl @@ -3,18 +3,22 @@ package subscribers import ( "encoding/json" - "go.ipao.vip/atom/contracts" "{{.ModuleName}}/app/events" "{{.ModuleName}}/app/events/publishers" + "{{.ModuleName}}/providers/event" "github.com/ThreeDotsLabs/watermill/message" "github.com/sirupsen/logrus" + "go.ipao.vip/atom/contracts" ) var _ contracts.EventHandler = (*UserRegister)(nil) // @provider(event) type UserRegister struct { + event.DefaultChannel + event.DefaultPublishTo + log *logrus.Entry `inject:"false"` } @@ -23,11 +27,6 @@ func (e *UserRegister) Prepare() error { return nil } -// PublishToTopic implements contracts.EventHandler. -func (e *UserRegister) PublishToTopic() string { - return events.TopicProcessed -} - // Topic implements contracts.EventHandler. func (e *UserRegister) Topic() string { return events.TopicUserRegister diff --git a/templates/project/app/events/subscribers/utils.go.tpl b/templates/project/app/events/subscribers/utils.go.tpl new file mode 100644 index 0000000..45419ef --- /dev/null +++ b/templates/project/app/events/subscribers/utils.go.tpl @@ -0,0 +1,24 @@ +package subscribers + +import ( + "encoding/json" + + "github.com/ThreeDotsLabs/watermill" + "github.com/ThreeDotsLabs/watermill/message" +) + +func toMessage(event any) (*message.Message, error) { + b, err := json.Marshal(event) + if err != nil { + return nil, err + } + return message.NewMessage(watermill.NewUUID(), b), nil +} + +func toMessageList(event any) ([]*message.Message, error) { + m, err := toMessage(event) + if err != nil { + return nil, err + } + return []*message.Message{m}, nil +} diff --git a/templates/project/providers/cmux/config.go.tpl b/templates/project/providers/cmux/config.go.tpl index b0895d2..d9a1c92 100644 --- a/templates/project/providers/cmux/config.go.tpl +++ b/templates/project/providers/cmux/config.go.tpl @@ -2,10 +2,13 @@ package cmux import ( "fmt" + "net" + "time" "{{.ModuleName}}/providers/grpc" "{{.ModuleName}}/providers/http" + log "github.com/sirupsen/logrus" "github.com/soheilhy/cmux" "go.ipao.vip/atom/container" "go.ipao.vip/atom/opt" @@ -39,23 +42,68 @@ type CMux struct { Http *http.Service Grpc *grpc.Grpc Mux cmux.CMux + Base net.Listener } func (c *CMux) Serve() error { - // grpcL := c.Mux.Match(cmux.HTTP2HeaderField("content-type", "application/grpc")) - // httpL := c.Mux.Match(cmux.HTTP1Fast()) - // httpL := c.Mux.Match(cmux.Any()) + // Protect against slowloris connections when sniffing protocol + // Safe even if SetReadTimeout is a no-op in the cmux version in use + c.Mux.SetReadTimeout(1 * time.Second) + + addr := "" + if c.Base != nil && c.Base.Addr() != nil { + addr = c.Base.Addr().String() + } + log.WithFields(log.Fields{ + "addr": addr, + }).Info("cmux starting") + + // Route classic HTTP/1.x traffic to the HTTP service httpL := c.Mux.Match(cmux.HTTP1Fast()) - grpcL := c.Mux.Match(cmux.Any()) + + // Route gRPC (HTTP/2 with content-type application/grpc) to the gRPC service. + // Additionally, send other HTTP/2 traffic to gRPC since Fiber (HTTP) does not serve HTTP/2. + grpcL := c.Mux.Match( + cmux.HTTP2HeaderField("content-type", "application/grpc"), + cmux.HTTP2(), + ) var eg errgroup.Group eg.Go(func() error { - return c.Grpc.ServeWithListener(grpcL) + log.WithField("addr", addr).Info("grpc serving via cmux") + err := c.Grpc.ServeWithListener(grpcL) + if err != nil { + log.WithError(err).Error("grpc server exited with error") + } else { + log.Info("grpc server exited") + } + return err }) eg.Go(func() error { - return c.Http.Listener(httpL) + log.WithField("addr", addr).Info("http serving via cmux") + err := c.Http.Listener(httpL) + if err != nil { + log.WithError(err).Error("http server exited with error") + } else { + log.Info("http server exited") + } + return err }) - return c.Mux.Serve() + // Run cmux dispatcher; wait for the first error from any goroutine + eg.Go(func() error { + err := c.Mux.Serve() + if err != nil { + log.WithError(err).Error("cmux exited with error") + } else { + log.Info("cmux exited") + } + return err + }) + err := eg.Wait() + if err == nil { + log.Info("cmux and sub-servers exited cleanly") + } + return err } diff --git a/templates/project/providers/cmux/provider.go.tpl b/templates/project/providers/cmux/provider.go.tpl index 708b3b8..47dd3ed 100644 --- a/templates/project/providers/cmux/provider.go.tpl +++ b/templates/project/providers/cmux/provider.go.tpl @@ -23,10 +23,15 @@ func Provide(opts ...opt.Option) error { return nil, err } - return &CMux{ + mux := &CMux{ Http: http, Grpc: grpc, Mux: cmux.New(l), - }, nil + Base: l, + } + // Ensure cmux stops accepting new connections on shutdown + container.AddCloseAble(func() { _ = l.Close() }) + + return mux, nil }, o.DiOptions()...) } diff --git a/templates/project/providers/event/channel.go.tpl b/templates/project/providers/event/channel.go.tpl new file mode 100644 index 0000000..f8f25f8 --- /dev/null +++ b/templates/project/providers/event/channel.go.tpl @@ -0,0 +1,30 @@ +package event + +import "go.ipao.vip/atom/contracts" + +const ( + Go contracts.Channel = "go" + Kafka contracts.Channel = "kafka" + Redis contracts.Channel = "redis" + Sql contracts.Channel = "sql" +) + +type DefaultPublishTo struct{} + +func (d *DefaultPublishTo) PublishTo() (contracts.Channel, string) { + return Go, "event:processed" +} + +type DefaultChannel struct{} + +func (d *DefaultChannel) Channel() contracts.Channel { return Go } + +// kafka +type KafkaChannel struct{} + +func (k *KafkaChannel) Channel() contracts.Channel { return Kafka } + +// kafka +type RedisChannel struct{} + +func (k *RedisChannel) Channel() contracts.Channel { return Redis } diff --git a/templates/project/providers/event/config.go.tpl b/templates/project/providers/event/config.go.tpl index ac20bd2..ba5509f 100644 --- a/templates/project/providers/event/config.go.tpl +++ b/templates/project/providers/event/config.go.tpl @@ -14,7 +14,7 @@ const DefaultPrefix = "Events" func DefaultProvider() container.ProviderContainer { return container.ProviderContainer{ - Provider: Provide, + Provider: ProvideChannel, Options: []opt.Option{ opt.Prefix(DefaultPrefix), }, @@ -22,15 +22,30 @@ func DefaultProvider() container.ProviderContainer { } type Config struct { - ConsumerGroup string + Sql *ConfigSql + Kafka *ConfigKafka + Redis *ConfigRedis +} - Brokers []string +type ConfigSql struct { + ConsumerGroup string +} + +type ConfigRedis struct { + ConsumerGroup string + Streams []string +} + +type ConfigKafka struct { + ConsumerGroup string + Brokers []string } type PubSub struct { - Publisher message.Publisher - Subscriber message.Subscriber - Router *message.Router + Router *message.Router + + publishers map[contracts.Channel]message.Publisher + subscribers map[contracts.Channel]message.Subscriber } func (ps *PubSub) Serve(ctx context.Context) error { @@ -40,15 +55,6 @@ func (ps *PubSub) Serve(ctx context.Context) error { return nil } -func (ps *PubSub) Handle( - handlerName string, - consumerTopic string, - publisherTopic string, - handler message.HandlerFunc, -) { - ps.Router.AddHandler(handlerName, consumerTopic, ps.Subscriber, publisherTopic, ps.Publisher, handler) -} - // publish func (ps *PubSub) Publish(e contracts.EventPublisher) error { if e == nil { @@ -61,5 +67,33 @@ func (ps *PubSub) Publish(e contracts.EventPublisher) error { } msg := message.NewMessage(watermill.NewUUID(), payload) - return ps.Publisher.Publish(e.Topic(), msg) + return ps.getPublisher(e.Channel()).Publish(e.Topic(), msg) +} + +// getPublisher returns the publisher for the specified channel. +func (ps *PubSub) getPublisher(channel contracts.Channel) message.Publisher { + if pub, ok := ps.publishers[channel]; ok { + return pub + } + return ps.publishers[Go] +} + +func (ps *PubSub) getSubscriber(channel contracts.Channel) message.Subscriber { + if sub, ok := ps.subscribers[channel]; ok { + return sub + } + return ps.subscribers[Go] +} + +func (ps *PubSub) Handle(handlerName string, sub contracts.EventHandler) { + publishToCh, publishToTopic := sub.PublishTo() + + ps.Router.AddHandler( + handlerName, + sub.Topic(), + ps.getSubscriber(sub.Channel()), + publishToTopic, + ps.getPublisher(publishToCh), + sub.Handler, + ) } diff --git a/templates/project/providers/event/provider.go.tpl b/templates/project/providers/event/provider.go.tpl index eb28d27..84cd980 100644 --- a/templates/project/providers/event/provider.go.tpl +++ b/templates/project/providers/event/provider.go.tpl @@ -1,14 +1,21 @@ package event import ( + sqlDB "database/sql" + "go.ipao.vip/atom/container" + "go.ipao.vip/atom/contracts" "go.ipao.vip/atom/opt" + "github.com/ThreeDotsLabs/watermill-kafka/v3/pkg/kafka" + "github.com/ThreeDotsLabs/watermill-redisstream/pkg/redisstream" + "github.com/ThreeDotsLabs/watermill-sql/v3/pkg/sql" "github.com/ThreeDotsLabs/watermill/message" "github.com/ThreeDotsLabs/watermill/pubsub/gochannel" + "github.com/redis/go-redis/v9" ) -func Provide(opts ...opt.Option) error { +func ProvideChannel(opts ...opt.Option) error { o := opt.New(opts...) var config Config if err := o.UnmarshalConfig(&config); err != nil { @@ -18,16 +25,85 @@ func Provide(opts ...opt.Option) error { return container.Container.Provide(func() (*PubSub, error) { logger := LogrusAdapter() + publishers := make(map[contracts.Channel]message.Publisher) + subscribers := make(map[contracts.Channel]message.Subscriber) + + // gochannel client := gochannel.NewGoChannel(gochannel.Config{}, logger) + publishers[Go] = client + subscribers[Go] = client + + // kafka + if config.Kafka != nil { + kafkaPublisher, err := kafka.NewPublisher(kafka.PublisherConfig{ + Brokers: config.Kafka.Brokers, + Marshaler: kafka.DefaultMarshaler{}, + }, logger) + if err != nil { + return nil, err + } + publishers[Kafka] = kafkaPublisher + + kafkaSubscriber, err := kafka.NewSubscriber(kafka.SubscriberConfig{ + Brokers: config.Kafka.Brokers, + Unmarshaler: kafka.DefaultMarshaler{}, + ConsumerGroup: config.Kafka.ConsumerGroup, + }, logger) + if err != nil { + return nil, err + } + subscribers[Kafka] = kafkaSubscriber + } + + // redis + if config.Redis != nil { + var rdb redis.UniversalClient + redisSubscriber, err := redisstream.NewSubscriber(redisstream.SubscriberConfig{ + Client: rdb, + Unmarshaller: redisstream.DefaultMarshallerUnmarshaller{}, + ConsumerGroup: config.Redis.ConsumerGroup, + }, logger) + if err != nil { + return nil, err + } + subscribers[Redis] = redisSubscriber + + redisPublisher, err := redisstream.NewPublisher(redisstream.PublisherConfig{ + Client: rdb, + Marshaller: redisstream.DefaultMarshallerUnmarshaller{}, + }, logger) + if err != nil { + return nil, err + } + publishers[Redis] = redisPublisher + } + + if config.Sql == nil { + var db *sqlDB.DB + sqlPublisher, err := sql.NewPublisher(db, sql.PublisherConfig{ + SchemaAdapter: sql.DefaultPostgreSQLSchema{}, + AutoInitializeSchema: false, + }, logger) + if err != nil { + return nil, err + } + publishers[Sql] = sqlPublisher + + sqlSubscriber, err := sql.NewSubscriber(db, sql.SubscriberConfig{ + SchemaAdapter: sql.DefaultPostgreSQLSchema{}, + ConsumerGroup: config.Sql.ConsumerGroup, + }, logger) + if err != nil { + return nil, err + } + subscribers[Sql] = sqlSubscriber + } + router, err := message.NewRouter(message.RouterConfig{}, logger) if err != nil { return nil, err } - return &PubSub{ - Publisher: client, - Subscriber: client, - Router: router, - }, nil + return &PubSub{Router: router, publishers: publishers, subscribers: subscribers}, nil }, o.DiOptions()...) } diff --git a/templates/project/providers/event/provider_kafka.go.tpl b/templates/project/providers/event/provider_kafka.go.tpl deleted file mode 100644 index 5d660c4..0000000 --- a/templates/project/providers/event/provider_kafka.go.tpl +++ /dev/null @@ -1,49 +0,0 @@ -package event - -import ( - "go.ipao.vip/atom/container" - "go.ipao.vip/atom/opt" - - "github.com/ThreeDotsLabs/watermill-kafka/v3/pkg/kafka" - "github.com/ThreeDotsLabs/watermill/message" -) - -func ProvideKafka(opts ...opt.Option) error { - o := opt.New(opts...) - var config Config - if err := o.UnmarshalConfig(&config); err != nil { - return err - } - - return container.Container.Provide(func() (*PubSub, error) { - logger := LogrusAdapter() - - publisher, err := kafka.NewPublisher(kafka.PublisherConfig{ - Brokers: config.Brokers, - Marshaler: kafka.DefaultMarshaler{}, - }, logger) - if err != nil { - return nil, err - } - - subscriber, err := kafka.NewSubscriber(kafka.SubscriberConfig{ - Brokers: config.Brokers, - Unmarshaler: kafka.DefaultMarshaler{}, - ConsumerGroup: config.ConsumerGroup, - }, logger) - if err != nil { - return nil, err - } - - router, err := message.NewRouter(message.RouterConfig{}, logger) - if err != nil { - return nil, err - } - - return &PubSub{ - Publisher: publisher, - Subscriber: subscriber, - Router: router, - }, nil - }, o.DiOptions()...) -} diff --git a/templates/project/providers/event/provider_redis.go.tpl b/templates/project/providers/event/provider_redis.go.tpl deleted file mode 100644 index 2fc08de..0000000 --- a/templates/project/providers/event/provider_redis.go.tpl +++ /dev/null @@ -1,50 +0,0 @@ -package event - -import ( - "go.ipao.vip/atom/container" - "go.ipao.vip/atom/opt" - - "github.com/ThreeDotsLabs/watermill-redisstream/pkg/redisstream" - "github.com/ThreeDotsLabs/watermill/message" - "github.com/redis/go-redis/v9" -) - -func ProvideRedis(opts ...opt.Option) error { - o := opt.New(opts...) - var config Config - if err := o.UnmarshalConfig(&config); err != nil { - return err - } - - return container.Container.Provide(func(rdb redis.UniversalClient) (*PubSub, error) { - logger := LogrusAdapter() - - subscriber, err := redisstream.NewSubscriber(redisstream.SubscriberConfig{ - Client: rdb, - Unmarshaller: redisstream.DefaultMarshallerUnmarshaller{}, - ConsumerGroup: config.ConsumerGroup, - }, logger) - if err != nil { - return nil, err - } - - publisher, err := redisstream.NewPublisher(redisstream.PublisherConfig{ - Client: rdb, - Marshaller: redisstream.DefaultMarshallerUnmarshaller{}, - }, logger) - if err != nil { - return nil, err - } - - router, err := message.NewRouter(message.RouterConfig{}, logger) - if err != nil { - return nil, err - } - - return &PubSub{ - Publisher: publisher, - Subscriber: subscriber, - Router: router, - }, nil - }, o.DiOptions()...) -} diff --git a/templates/project/providers/event/provider_sql.go.tpl b/templates/project/providers/event/provider_sql.go.tpl deleted file mode 100644 index 033e2ef..0000000 --- a/templates/project/providers/event/provider_sql.go.tpl +++ /dev/null @@ -1,50 +0,0 @@ -package event - -import ( - sqlDB "database/sql" - - "go.ipao.vip/atom/container" - "go.ipao.vip/atom/opt" - - "github.com/ThreeDotsLabs/watermill-sql/v3/pkg/sql" - "github.com/ThreeDotsLabs/watermill/message" -) - -func ProvideSQL(opts ...opt.Option) error { - o := opt.New(opts...) - var config Config - if err := o.UnmarshalConfig(&config); err != nil { - return err - } - - return container.Container.Provide(func(db *sqlDB.DB) (*PubSub, error) { - logger := LogrusAdapter() - - publisher, err := sql.NewPublisher(db, sql.PublisherConfig{ - SchemaAdapter: sql.DefaultPostgreSQLSchema{}, - AutoInitializeSchema: false, - }, logger) - if err != nil { - return nil, err - } - - subscriber, err := sql.NewSubscriber(db, sql.SubscriberConfig{ - SchemaAdapter: sql.DefaultPostgreSQLSchema{}, - ConsumerGroup: config.ConsumerGroup, - }, logger) - if err != nil { - return nil, err - } - - router, err := message.NewRouter(message.RouterConfig{}, logger) - if err != nil { - return nil, err - } - - return &PubSub{ - Publisher: publisher, - Subscriber: subscriber, - Router: router, - }, nil - }, o.DiOptions()...) -}