init
This commit is contained in:
99
backend/providers/event/config.go
Normal file
99
backend/providers/event/config.go
Normal file
@@ -0,0 +1,99 @@
|
||||
package event
|
||||
|
||||
import (
|
||||
"context"
|
||||
|
||||
"github.com/ThreeDotsLabs/watermill"
|
||||
"github.com/ThreeDotsLabs/watermill/message"
|
||||
"go.ipao.vip/atom/container"
|
||||
"go.ipao.vip/atom/contracts"
|
||||
"go.ipao.vip/atom/opt"
|
||||
)
|
||||
|
||||
const DefaultPrefix = "Events"
|
||||
|
||||
func DefaultProvider() container.ProviderContainer {
|
||||
return container.ProviderContainer{
|
||||
Provider: ProvideChannel,
|
||||
Options: []opt.Option{
|
||||
opt.Prefix(DefaultPrefix),
|
||||
},
|
||||
}
|
||||
}
|
||||
|
||||
type Config struct {
|
||||
Sql *ConfigSql
|
||||
Kafka *ConfigKafka
|
||||
Redis *ConfigRedis
|
||||
}
|
||||
|
||||
type ConfigSql struct {
|
||||
ConsumerGroup string
|
||||
}
|
||||
|
||||
type ConfigRedis struct {
|
||||
ConsumerGroup string
|
||||
Streams []string
|
||||
}
|
||||
|
||||
type ConfigKafka struct {
|
||||
ConsumerGroup string
|
||||
Brokers []string
|
||||
}
|
||||
|
||||
type PubSub struct {
|
||||
Router *message.Router
|
||||
|
||||
publishers map[contracts.Channel]message.Publisher
|
||||
subscribers map[contracts.Channel]message.Subscriber
|
||||
}
|
||||
|
||||
func (ps *PubSub) Serve(ctx context.Context) error {
|
||||
if err := ps.Router.Run(ctx); err != nil {
|
||||
return err
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
// publish
|
||||
func (ps *PubSub) Publish(e contracts.EventPublisher) error {
|
||||
if e == nil {
|
||||
return nil
|
||||
}
|
||||
|
||||
payload, err := e.Marshal()
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
msg := message.NewMessage(watermill.NewUUID(), payload)
|
||||
return ps.getPublisher(e.Channel()).Publish(e.Topic(), msg)
|
||||
}
|
||||
|
||||
// getPublisher returns the publisher for the specified channel.
|
||||
func (ps *PubSub) getPublisher(channel contracts.Channel) message.Publisher {
|
||||
if pub, ok := ps.publishers[channel]; ok {
|
||||
return pub
|
||||
}
|
||||
return ps.publishers[Go]
|
||||
}
|
||||
|
||||
func (ps *PubSub) getSubscriber(channel contracts.Channel) message.Subscriber {
|
||||
if sub, ok := ps.subscribers[channel]; ok {
|
||||
return sub
|
||||
}
|
||||
return ps.subscribers[Go]
|
||||
}
|
||||
|
||||
func (ps *PubSub) Handle(handlerName string, sub contracts.EventHandler) {
|
||||
publishToCh, publishToTopic := sub.PublishTo()
|
||||
|
||||
ps.Router.AddHandler(
|
||||
handlerName,
|
||||
sub.Topic(),
|
||||
ps.getSubscriber(sub.Channel()),
|
||||
publishToTopic,
|
||||
ps.getPublisher(publishToCh),
|
||||
sub.Handler,
|
||||
)
|
||||
}
|
||||
Reference in New Issue
Block a user