Files
quyun-v2/backend/providers/grpc/config.go

155 lines
4.0 KiB
Go

package grpc
import (
"fmt"
"net"
"time"
"go.ipao.vip/atom/container"
"go.ipao.vip/atom/opt"
"google.golang.org/grpc"
"google.golang.org/grpc/health"
grpc_health_v1 "google.golang.org/grpc/health/grpc_health_v1"
"google.golang.org/grpc/reflection"
)
const DefaultPrefix = "Grpc"
func DefaultProvider() container.ProviderContainer {
return container.ProviderContainer{
Provider: Provide,
Options: []opt.Option{
opt.Prefix(DefaultPrefix),
},
}
}
type Config struct {
Host *string
Port uint
// EnableReflection enables grpc/reflection registration when true
EnableReflection *bool
// EnableHealth enables gRPC health service registration when true
EnableHealth *bool
// ShutdownTimeoutSeconds controls graceful stop timeout; 0 uses default
ShutdownTimeoutSeconds uint
}
func (cfg *Config) Address() string {
if cfg.Port == 0 {
cfg.Port = 8081
}
if cfg.Host == nil {
return fmt.Sprintf(":%d", cfg.Port)
}
return fmt.Sprintf("%s:%d", *cfg.Host, cfg.Port)
}
type Grpc struct {
Server *grpc.Server
config *Config
options []grpc.ServerOption
unaryInterceptors []grpc.UnaryServerInterceptor
streamInterceptors []grpc.StreamServerInterceptor
}
func (grpcServer *Grpc) Init() error {
// merge options and build interceptor chains if provided
var srvOpts []grpc.ServerOption
if len(grpcServer.unaryInterceptors) > 0 {
srvOpts = append(srvOpts, grpc.ChainUnaryInterceptor(grpcServer.unaryInterceptors...))
}
if len(grpcServer.streamInterceptors) > 0 {
srvOpts = append(srvOpts, grpc.ChainStreamInterceptor(grpcServer.streamInterceptors...))
}
srvOpts = append(srvOpts, grpcServer.options...)
grpcServer.Server = grpc.NewServer(srvOpts...)
// optional reflection and health
if grpcServer.config.EnableReflection != nil && *grpcServer.config.EnableReflection {
reflection.Register(grpcServer.Server)
}
if grpcServer.config.EnableHealth != nil && *grpcServer.config.EnableHealth {
hs := health.NewServer()
grpc_health_v1.RegisterHealthServer(grpcServer.Server, hs)
}
// graceful stop with timeout fallback to Stop()
container.AddCloseAble(func() {
timeout := grpcServer.config.ShutdownTimeoutSeconds
if timeout == 0 {
timeout = 10
}
done := make(chan struct{})
go func() {
grpcServer.Server.GracefulStop()
close(done)
}()
select {
case <-done:
// graceful stop finished
case <-time.After(time.Duration(int64(timeout)) * time.Second):
// timeout, force stop
grpcServer.Server.Stop()
}
})
return nil
}
// Serve
func (grpcServer *Grpc) Serve() error {
if grpcServer.Server == nil {
if err := grpcServer.Init(); err != nil {
return err
}
}
listener, err := net.Listen("tcp", grpcServer.config.Address())
if err != nil {
return fmt.Errorf("listen grpc address: %w", err)
}
if err := grpcServer.Server.Serve(listener); err != nil {
return fmt.Errorf("serve grpc listener: %w", err)
}
return nil
}
func (grpcServer *Grpc) ServeWithListener(listener net.Listener) error {
if err := grpcServer.Server.Serve(listener); err != nil {
return fmt.Errorf("serve grpc with listener: %w", err)
}
return nil
}
// UseOptions appends gRPC ServerOptions to be applied when constructing the server.
func (grpcServer *Grpc) UseOptions(opts ...grpc.ServerOption) {
grpcServer.options = append(grpcServer.options, opts...)
}
// UseUnaryInterceptors appends unary interceptors to be chained.
func (grpcServer *Grpc) UseUnaryInterceptors(inters ...grpc.UnaryServerInterceptor) {
grpcServer.unaryInterceptors = append(grpcServer.unaryInterceptors, inters...)
}
// UseStreamInterceptors appends stream interceptors to be chained.
func (grpcServer *Grpc) UseStreamInterceptors(inters ...grpc.StreamServerInterceptor) {
grpcServer.streamInterceptors = append(grpcServer.streamInterceptors, inters...)
}
// Reset clears all configured options and interceptors.
// Useful in tests to ensure isolation.
func (grpcServer *Grpc) Reset() {
grpcServer.options = nil
grpcServer.unaryInterceptors = nil
grpcServer.streamInterceptors = nil
}