diff --git a/templates/project/app/events/event_demo.go.tpl b/templates/project/app/events/event_demo.go.tpl index 57de3d7..35a9f75 100644 --- a/templates/project/app/events/event_demo.go.tpl +++ b/templates/project/app/events/event_demo.go.tpl @@ -2,70 +2,51 @@ package events import ( "encoding/json" - "fmt" - "time" "git.ipao.vip/rogeecn/atom/contracts" - "github.com/ThreeDotsLabs/watermill" "github.com/ThreeDotsLabs/watermill/message" "github.com/sirupsen/logrus" ) -var _ contracts.EventHandler = (*UserRegister)(nil) - -type Event struct { - ID int `json:"id"` -} - -type ProcessedEvent struct { - ProcessedID int `json:"processed_id"` - Time time.Time `json:"time"` -} +var ( + _ contracts.EventHandler = (*UserRegister)(nil) + _ contracts.EventPublisher = (*UserRegister)(nil) +) // @provider(event) type UserRegister struct { - log *logrus.Entry `inject:"false"` + log *logrus.Entry `inject:"false" json:"-"` + ID int64 `json:"id"` } -func (u *UserRegister) Prepare() error { +func (e *UserRegister) Prepare() error { return nil } -// Handler implements contracts.EventHandler. -func (u *UserRegister) Handler(msg *message.Message) ([]*message.Message, error) { - consumedPayload := Event{} - err := json.Unmarshal(msg.Payload, &consumedPayload) - if err != nil { - // When a handler returns an error, the default behavior is to send a Nack (negative-acknowledgement). - // The message will be processed again. - // - // You can change the default behaviour by using middlewares, like Retry or PoisonQueue. - // You can also implement your own middleware. - return nil, err - } - - fmt.Printf("received event %+v\n", consumedPayload) - - newPayload, err := json.Marshal(ProcessedEvent{ - ProcessedID: consumedPayload.ID, - Time: time.Now(), - }) - if err != nil { - return nil, err - } - - newMessage := message.NewMessage(watermill.NewUUID(), newPayload) - - return nil, nil - return []*message.Message{newMessage}, nil +// Marshal implements contracts.EventPublisher. +func (e *UserRegister) Marshal() ([]byte, error) { + return json.Marshal(e) } // PublishToTopic implements contracts.EventHandler. -func (u *UserRegister) PublishToTopic() string { - return "event:processed" +func (e *UserRegister) PublishToTopic() string { + return TopicProcessed.String() } // Topic implements contracts.EventHandler. -func (u *UserRegister) Topic() string { - return "event:user-register" +func (e *UserRegister) Topic() string { + return TopicUserRegister.String() +} + +// Handler implements contracts.EventHandler. +func (e *UserRegister) Handler(msg *message.Message) ([]*message.Message, error) { + var payload UserRegister + err := json.Unmarshal(msg.Payload, &payload) + if err != nil { + return nil, err + } + + e.log.Infof("received event %+v\n", payload) + + return nil, nil } diff --git a/templates/project/app/events/topics.go.tpl b/templates/project/app/events/topics.go.tpl new file mode 100644 index 0000000..bf9c96a --- /dev/null +++ b/templates/project/app/events/topics.go.tpl @@ -0,0 +1,10 @@ +package events + +// swagger:enum Topic +// ENUM( +// +// Processed = "event:processed" +// UserRegister = "user:register" +// +// ) +type Topic string diff --git a/templates/project/app/service/event/event.go.tpl b/templates/project/app/service/event/event.go.tpl index 28dca83..8fbe71a 100644 --- a/templates/project/app/service/event/event.go.tpl +++ b/templates/project/app/service/event/event.go.tpl @@ -6,7 +6,7 @@ import ( "{{.ModuleName}}/app/events" "{{.ModuleName}}/app/service" "{{.ModuleName}}/providers/app" - providerEvents "{{.ModuleName}}/providers/events" + "{{.ModuleName}}/providers/event" "{{.ModuleName}}/providers/postgres" "git.ipao.vip/rogeecn/atom" @@ -41,7 +41,7 @@ type Service struct { dig.In App *app.Config - PubSub *providerEvents.PubSub + PubSub *event.PubSub Initials []contracts.Initial `group:"initials"` } diff --git a/templates/project/app/service/service.go.tpl b/templates/project/app/service/service.go.tpl index 2829ec2..7054622 100644 --- a/templates/project/app/service/service.go.tpl +++ b/templates/project/app/service/service.go.tpl @@ -2,7 +2,7 @@ package service import ( "{{.ModuleName}}/providers/app" - "{{.ModuleName}}/providers/events" + "{{.ModuleName}}/providers/event" "git.ipao.vip/rogeecn/atom/container" ) @@ -10,6 +10,6 @@ import ( func Default(providers ...container.ProviderContainer) container.Providers { return append(container.Providers{ app.DefaultProvider(), - events.DefaultProvider(), + event.DefaultProvider(), }, providers...) } diff --git a/templates/project/providers/events/config.go.tpl b/templates/project/providers/event/config.go.tpl similarity index 73% rename from templates/project/providers/events/config.go.tpl rename to templates/project/providers/event/config.go.tpl index 69c79a2..1f98a8f 100644 --- a/templates/project/providers/events/config.go.tpl +++ b/templates/project/providers/event/config.go.tpl @@ -1,10 +1,11 @@ -package events +package event import ( "context" "git.ipao.vip/rogeecn/atom/container" "git.ipao.vip/rogeecn/atom/utils/opt" + "github.com/ThreeDotsLabs/watermill" "github.com/ThreeDotsLabs/watermill/message" ) @@ -46,3 +47,18 @@ func (ps *PubSub) Handle( ) { ps.Router.AddHandler(handlerName, consumerTopic, ps.Subscriber, publisherTopic, ps.Publisher, handler) } + +// publish +func (ps *PubSub) Publish(e contracts.EventPublisher) error { + if e == nil { + return nil + } + + payload, err := e.Marshal() + if err != nil { + return err + } + + msg := message.NewMessage(watermill.NewUUID(), payload) + return ps.Publisher.Publish(e.Topic(), msg) +} diff --git a/templates/project/providers/events/logrus_adapter.go.tpl b/templates/project/providers/event/logrus_adapter.go.tpl similarity index 99% rename from templates/project/providers/events/logrus_adapter.go.tpl rename to templates/project/providers/event/logrus_adapter.go.tpl index a47ab64..b4cdd41 100644 --- a/templates/project/providers/events/logrus_adapter.go.tpl +++ b/templates/project/providers/event/logrus_adapter.go.tpl @@ -1,4 +1,4 @@ -package events +package event import ( "github.com/ThreeDotsLabs/watermill" diff --git a/templates/project/providers/events/provider.go.tpl b/templates/project/providers/event/provider.go.tpl similarity index 97% rename from templates/project/providers/events/provider.go.tpl rename to templates/project/providers/event/provider.go.tpl index c228926..f189156 100644 --- a/templates/project/providers/events/provider.go.tpl +++ b/templates/project/providers/event/provider.go.tpl @@ -1,4 +1,4 @@ -package events +package event import ( "git.ipao.vip/rogeecn/atom/container" diff --git a/templates/project/providers/events/provider_kafka.go.tpl b/templates/project/providers/event/provider_kafka.go.tpl similarity index 98% rename from templates/project/providers/events/provider_kafka.go.tpl rename to templates/project/providers/event/provider_kafka.go.tpl index 6f58f97..52e4a31 100644 --- a/templates/project/providers/events/provider_kafka.go.tpl +++ b/templates/project/providers/event/provider_kafka.go.tpl @@ -1,4 +1,4 @@ -package events +package event import ( "git.ipao.vip/rogeecn/atom/container" diff --git a/templates/project/providers/events/provider_redis.go.tpl b/templates/project/providers/event/provider_redis.go.tpl similarity index 98% rename from templates/project/providers/events/provider_redis.go.tpl rename to templates/project/providers/event/provider_redis.go.tpl index 0d48bb4..e6fd5bb 100644 --- a/templates/project/providers/events/provider_redis.go.tpl +++ b/templates/project/providers/event/provider_redis.go.tpl @@ -1,4 +1,4 @@ -package events +package event import ( "git.ipao.vip/rogeecn/atom/container" diff --git a/templates/project/providers/events/provider_sql.go.tpl b/templates/project/providers/event/provider_sql.go.tpl similarity index 98% rename from templates/project/providers/events/provider_sql.go.tpl rename to templates/project/providers/event/provider_sql.go.tpl index b862313..e48e520 100644 --- a/templates/project/providers/events/provider_sql.go.tpl +++ b/templates/project/providers/event/provider_sql.go.tpl @@ -1,4 +1,4 @@ -package events +package event import ( sqlDB "database/sql"