package grpc import ( "fmt" "net" "time" "go.ipao.vip/atom/container" "go.ipao.vip/atom/opt" "google.golang.org/grpc" "google.golang.org/grpc/health" grpc_health_v1 "google.golang.org/grpc/health/grpc_health_v1" "google.golang.org/grpc/reflection" ) const DefaultPrefix = "Grpc" func DefaultProvider() container.ProviderContainer { return container.ProviderContainer{ Provider: Provide, Options: []opt.Option{ opt.Prefix(DefaultPrefix), }, } } type Config struct { Host *string Port uint // EnableReflection enables grpc/reflection registration when true EnableReflection *bool // EnableHealth enables gRPC health service registration when true EnableHealth *bool // ShutdownTimeoutSeconds controls graceful stop timeout; 0 uses default ShutdownTimeoutSeconds uint } func (cfg *Config) Address() string { if cfg.Port == 0 { cfg.Port = 8081 } if cfg.Host == nil { return fmt.Sprintf(":%d", cfg.Port) } return fmt.Sprintf("%s:%d", *cfg.Host, cfg.Port) } type Grpc struct { Server *grpc.Server config *Config options []grpc.ServerOption unaryInterceptors []grpc.UnaryServerInterceptor streamInterceptors []grpc.StreamServerInterceptor } func (grpcServer *Grpc) Init() error { // merge options and build interceptor chains if provided var srvOpts []grpc.ServerOption if len(grpcServer.unaryInterceptors) > 0 { srvOpts = append(srvOpts, grpc.ChainUnaryInterceptor(grpcServer.unaryInterceptors...)) } if len(grpcServer.streamInterceptors) > 0 { srvOpts = append(srvOpts, grpc.ChainStreamInterceptor(grpcServer.streamInterceptors...)) } srvOpts = append(srvOpts, grpcServer.options...) grpcServer.Server = grpc.NewServer(srvOpts...) // optional reflection and health if grpcServer.config.EnableReflection != nil && *grpcServer.config.EnableReflection { reflection.Register(grpcServer.Server) } if grpcServer.config.EnableHealth != nil && *grpcServer.config.EnableHealth { hs := health.NewServer() grpc_health_v1.RegisterHealthServer(grpcServer.Server, hs) } // graceful stop with timeout fallback to Stop() container.AddCloseAble(func() { timeout := grpcServer.config.ShutdownTimeoutSeconds if timeout == 0 { timeout = 10 } done := make(chan struct{}) go func() { grpcServer.Server.GracefulStop() close(done) }() select { case <-done: // graceful stop finished case <-time.After(time.Duration(int64(timeout)) * time.Second): // timeout, force stop grpcServer.Server.Stop() } }) return nil } // Serve func (grpcServer *Grpc) Serve() error { if grpcServer.Server == nil { if err := grpcServer.Init(); err != nil { return err } } listener, err := net.Listen("tcp", grpcServer.config.Address()) if err != nil { return fmt.Errorf("listen grpc address: %w", err) } if err := grpcServer.Server.Serve(listener); err != nil { return fmt.Errorf("serve grpc listener: %w", err) } return nil } func (grpcServer *Grpc) ServeWithListener(listener net.Listener) error { if err := grpcServer.Server.Serve(listener); err != nil { return fmt.Errorf("serve grpc with listener: %w", err) } return nil } // UseOptions appends gRPC ServerOptions to be applied when constructing the server. func (grpcServer *Grpc) UseOptions(opts ...grpc.ServerOption) { grpcServer.options = append(grpcServer.options, opts...) } // UseUnaryInterceptors appends unary interceptors to be chained. func (grpcServer *Grpc) UseUnaryInterceptors(inters ...grpc.UnaryServerInterceptor) { grpcServer.unaryInterceptors = append(grpcServer.unaryInterceptors, inters...) } // UseStreamInterceptors appends stream interceptors to be chained. func (grpcServer *Grpc) UseStreamInterceptors(inters ...grpc.StreamServerInterceptor) { grpcServer.streamInterceptors = append(grpcServer.streamInterceptors, inters...) } // Reset clears all configured options and interceptors. // Useful in tests to ensure isolation. func (grpcServer *Grpc) Reset() { grpcServer.options = nil grpcServer.unaryInterceptors = nil grpcServer.streamInterceptors = nil }