package subscribers import ( "context" "encoding/json" "backend/app/events" "backend/app/events/publishers" "backend/app/http/users" "backend/database/fields" "git.ipao.vip/rogeecn/atom/contracts" "github.com/ThreeDotsLabs/watermill/message" "github.com/pkg/errors" "github.com/sirupsen/logrus" ) var _ contracts.EventHandler = (*UserRegister)(nil) // @provider(event) type UserRegister struct { log *logrus.Entry `inject:"false"` userSvc *users.Service } func (e *UserRegister) Prepare() error { e.log = logrus.WithField("module", "events.subscribers.user_register") return nil } // PublishToTopic implements contracts.EventHandler. func (e *UserRegister) PublishToTopic() string { return events.TopicProcessed } // Topic implements contracts.EventHandler. func (e *UserRegister) Topic() string { return events.TopicUserRegister } // Handler implements contracts.EventHandler. func (e *UserRegister) Handler(msg *message.Message) ([]*message.Message, error) { var payload publishers.UserRegister err := json.Unmarshal(msg.Payload, &payload) if err != nil { return nil, err } e.log.Infof("received event %s", msg.Payload) user, err := e.userSvc.GetUserByID(context.Background(), payload.ID) if err != nil { return nil, errors.Wrapf(err, "failed to get user by id: %d", payload.ID) } if user.Status != fields.UserStatusPending { return nil, nil } err = e.userSvc.SetUserStatusByID(context.Background(), payload.ID, fields.UserStatusVerified) if err != nil { return nil, errors.Wrapf(err, "failed to set user status to %s by id: %d", fields.UserStatusVerified, payload.ID) } return nil, nil }