diff --git a/templates/project/providers/grpc/config.go.tpl b/templates/project/providers/grpc/config.go.tpl index ffbcb13..b5dcbe2 100644 --- a/templates/project/providers/grpc/config.go.tpl +++ b/templates/project/providers/grpc/config.go.tpl @@ -3,11 +3,15 @@ package grpc import ( "fmt" "net" + "time" "go.ipao.vip/atom/container" "go.ipao.vip/atom/opt" "google.golang.org/grpc" + "google.golang.org/grpc/health" + grpc_health_v1 "google.golang.org/grpc/health/grpc_health_v1" + "google.golang.org/grpc/reflection" ) const DefaultPrefix = "Grpc" @@ -24,9 +28,19 @@ func DefaultProvider() container.ProviderContainer { type Config struct { Host *string Port uint + // EnableReflection enables grpc/reflection registration when true + EnableReflection *bool + // EnableHealth enables gRPC health service registration when true + EnableHealth *bool + // ShutdownTimeoutSeconds controls graceful stop timeout; 0 uses default + ShutdownTimeoutSeconds uint } func (h *Config) Address() string { + if h.Port == 0 { + h.Port = 8081 + } + if h.Host == nil { return fmt.Sprintf(":%d", h.Port) } @@ -36,10 +50,65 @@ func (h *Config) Address() string { type Grpc struct { Server *grpc.Server config *Config + + options []grpc.ServerOption + unaryInterceptors []grpc.UnaryServerInterceptor + streamInterceptors []grpc.StreamServerInterceptor +} + +func (g *Grpc) Init() error { + // merge options and build interceptor chains if provided + var srvOpts []grpc.ServerOption + if len(g.unaryInterceptors) > 0 { + srvOpts = append(srvOpts, grpc.ChainUnaryInterceptor(g.unaryInterceptors...)) + } + if len(g.streamInterceptors) > 0 { + srvOpts = append(srvOpts, grpc.ChainStreamInterceptor(g.streamInterceptors...)) + } + srvOpts = append(srvOpts, g.options...) + + g.Server = grpc.NewServer(srvOpts...) + + // optional reflection and health + if g.config.EnableReflection != nil && *g.config.EnableReflection { + reflection.Register(g.Server) + } + if g.config.EnableHealth != nil && *g.config.EnableHealth { + hs := health.NewServer() + grpc_health_v1.RegisterHealthServer(g.Server, hs) + } + + // graceful stop with timeout fallback to Stop() + container.AddCloseAble(func() { + timeout := g.config.ShutdownTimeoutSeconds + if timeout == 0 { + timeout = 10 + } + done := make(chan struct{}) + go func() { + g.Server.GracefulStop() + close(done) + }() + select { + case <-done: + // graceful stop finished + case <-time.After(time.Duration(timeout) * time.Second): + // timeout, force stop + g.Server.Stop() + } + }) + + return nil } // Serve func (g *Grpc) Serve() error { + if g.Server == nil { + if err := g.Init(); err != nil { + return err + } + } + l, err := net.Listen("tcp", g.config.Address()) if err != nil { return err @@ -51,3 +120,26 @@ func (g *Grpc) Serve() error { func (g *Grpc) ServeWithListener(ln net.Listener) error { return g.Server.Serve(ln) } + +// UseOptions appends gRPC ServerOptions to be applied when constructing the server. +func (g *Grpc) UseOptions(opts ...grpc.ServerOption) { + g.options = append(g.options, opts...) +} + +// UseUnaryInterceptors appends unary interceptors to be chained. +func (g *Grpc) UseUnaryInterceptors(inters ...grpc.UnaryServerInterceptor) { + g.unaryInterceptors = append(g.unaryInterceptors, inters...) +} + +// UseStreamInterceptors appends stream interceptors to be chained. +func (g *Grpc) UseStreamInterceptors(inters ...grpc.StreamServerInterceptor) { + g.streamInterceptors = append(g.streamInterceptors, inters...) +} + +// Reset clears all configured options and interceptors. +// Useful in tests to ensure isolation. +func (g *Grpc) Reset() { + g.options = nil + g.unaryInterceptors = nil + g.streamInterceptors = nil +} diff --git a/templates/project/providers/grpc/options.md.raw b/templates/project/providers/grpc/options.md.raw new file mode 100644 index 0000000..99bb6af --- /dev/null +++ b/templates/project/providers/grpc/options.md.raw @@ -0,0 +1,460 @@ +# gRPC Server Options & Interceptors Examples + +本文件给出一些可直接拷贝使用的示例,配合本包提供的注册函数: + +- `UseOptions(opts ...grpc.ServerOption)` +- `UseUnaryInterceptors(inters ...grpc.UnaryServerInterceptor)` +- `UseStreamInterceptors(inters ...grpc.StreamServerInterceptor)` + +建议在应用启动或 Provider 初始化阶段调用(在 gRPC 服务构造前)。 + +> 导入建议: +> +> ```go +> import ( +> pgrpc "test/providers/grpc" // 本包 +> grpc "google.golang.org/grpc" // 避免命名冲突 +> ) +> ``` + +## ServerOption 示例 + +最大消息大小限制: + +```go +pgrpc.UseOptions( + grpc.MaxRecvMsgSize(32<<20), // 32 MiB + grpc.MaxSendMsgSize(32<<20), // 32 MiB +) +``` + +限制最大并发流(对 HTTP/2 流并发施加上限): + +```go +pgrpc.UseOptions( + grpc.MaxConcurrentStreams(1024), +) +``` + +Keepalive 参数(需要 keepalive 包): + +```go +import ( + "time" + "google.golang.org/grpc/keepalive" +) + +pgrpc.UseOptions( + grpc.KeepaliveParams(keepalive.ServerParameters{ + MaxConnectionIdle: 5 * time.Minute, + MaxConnectionAge: 30 * time.Minute, + MaxConnectionAgeGrace: 5 * time.Minute, + Time: 2 * time.Minute, // ping 间隔 + Timeout: 20 * time.Second, + }), + grpc.KeepaliveEnforcementPolicy(keepalive.EnforcementPolicy{ + MinTime: 1 * time.Minute, // 客户端 ping 最小间隔 + PermitWithoutStream: true, + }), +) +``` + +## UnaryServerInterceptor 示例 + +简单日志拦截器(logrus): + +```go +import ( + "context" + "time" + log "github.com/sirupsen/logrus" + "google.golang.org/grpc" +) + +func LoggingUnaryInterceptor( + ctx context.Context, + req any, + info *grpc.UnaryServerInfo, + handler grpc.UnaryHandler, +) (any, error) { + start := time.Now() + resp, err := handler(ctx, req) + dur := time.Since(start) + entry := log.WithFields(log.Fields{ + "grpc.method": info.FullMethod, + "grpc.duration_ms": dur.Milliseconds(), + }) + if err != nil { + entry.WithError(err).Warn("grpc unary request failed") + } else { + entry.Info("grpc unary request finished") + } + return resp, err +} + +// 注册 +pgrpc.UseUnaryInterceptors(LoggingUnaryInterceptor) +``` + +恢复拦截器(panic 捕获): + +```go +import ( + "context" + "fmt" + "runtime/debug" + log "github.com/sirupsen/logrus" + "google.golang.org/grpc" + "google.golang.org/grpc/status" + "google.golang.org/grpc/codes" +) + +func RecoveryUnaryInterceptor( + ctx context.Context, + req any, + info *grpc.UnaryServerInfo, + handler grpc.UnaryHandler, +) (any, error) { + defer func() { + if r := recover(); r != nil { + log.WithField("grpc.method", info.FullMethod).Errorf("panic: %v\n%s", r, debug.Stack()) + } + }() + return handler(ctx, req) +} + +// 或者向客户端返回内部错误: +func RecoveryUnaryInterceptorWithError( + ctx context.Context, + req any, + info *grpc.UnaryServerInfo, + handler grpc.UnaryHandler, +) (any, error) { + defer func() { + if r := recover(); r != nil { + log.WithField("grpc.method", info.FullMethod).Errorf("panic: %v\n%s", r, debug.Stack()) + } + }() + resp, err := handler(ctx, req) + if rec := recover(); rec != nil { + return nil, status.Error(codes.Internal, fmt.Sprint(rec)) + } + return resp, err +} + +pgrpc.UseUnaryInterceptors(RecoveryUnaryInterceptor) +``` + +链式调用(与其它拦截器共同使用): + +```go +pgrpc.UseUnaryInterceptors(LoggingUnaryInterceptor, RecoveryUnaryInterceptor) +``` + +## StreamServerInterceptor 示例 + +简单日志拦截器: + +```go +import ( + "time" + log "github.com/sirupsen/logrus" + "google.golang.org/grpc" +) + +func LoggingStreamInterceptor( + srv any, + ss grpc.ServerStream, + info *grpc.StreamServerInfo, + handler grpc.StreamHandler, +) error { + start := time.Now() + err := handler(srv, ss) + dur := time.Since(start) + entry := log.WithFields(log.Fields{ + "grpc.method": info.FullMethod, + "grpc.is_client_stream": info.IsClientStream, + "grpc.is_server_stream": info.IsServerStream, + "grpc.duration_ms": dur.Milliseconds(), + }) + if err != nil { + entry.WithError(err).Warn("grpc stream request failed") + } else { + entry.Info("grpc stream request finished") + } + return err +} + +pgrpc.UseStreamInterceptors(LoggingStreamInterceptor) +``` + +恢复拦截器(panic 捕获): + +```go +import ( + "runtime/debug" + log "github.com/sirupsen/logrus" + "google.golang.org/grpc" +) + +func RecoveryStreamInterceptor( + srv any, + ss grpc.ServerStream, + info *grpc.StreamServerInfo, + handler grpc.StreamHandler, +) (err error) { + defer func() { + if r := recover(); r != nil { + log.WithField("grpc.method", info.FullMethod).Errorf("panic: %v\n%s", r, debug.Stack()) + } + }() + return handler(srv, ss) +} + +pgrpc.UseStreamInterceptors(RecoveryStreamInterceptor) +``` + +## 组合与测试小贴士 + +- 可以多次调用 `UseOptions/UseUnaryInterceptors/UseStreamInterceptors`,最终会在服务构造时链式生效。 +- 单元测试中如需隔离,建议使用 `pgrpc.Reset()` 清理已注册的选项和拦截器。 +- 若要启用健康检查或反射,请在配置中设置: + - `EnableHealth = true` + - `EnableReflection = true` + +## 更多 ServerOption 示例 + +TLS(服务端或 mTLS): + +```go +import ( + "crypto/tls" + grpcCredentials "google.golang.org/grpc/credentials" +) + +// 使用自定义 tls.Config(可配置 mTLS) +var tlsConfig *tls.Config = &tls.Config{ /* ... */ } +pgrpc.UseOptions( + grpc.Creds(grpcCredentials.NewTLS(tlsConfig)), +) + +// 或者从证书文件加载(仅服务端 TLS) +// pgrpc.UseOptions(grpc.Creds(grpcCredentials.NewServerTLSFromFile(certFile, keyFile))) +``` + +OpenTelemetry 统计/追踪(StatsHandler): + +```go +import ( + otelgrpc "go.opentelemetry.io/contrib/instrumentation/google.golang.org/grpc/otelgrpc" +) + +pgrpc.UseOptions( + grpc.StatsHandler(otelgrpc.NewServerHandler()), +) +``` + +流控/缓冲区调优: + +```go +pgrpc.UseOptions( + grpc.InitialWindowSize(1<<20), // 每个流初始窗口(字节) + grpc.InitialConnWindowSize(1<<21), // 连接级窗口 + grpc.ReadBufferSize(64<<10), // 读缓冲 64 KiB + grpc.WriteBufferSize(64<<10), // 写缓冲 64 KiB +) +``` + +连接超时与 Tap Handle(早期拦截): + +```go +import ( + "context" + "time" + "google.golang.org/grpc/tap" +) + +pgrpc.UseOptions( + grpc.ConnectionTimeout(5 * time.Second), + grpc.InTapHandle(func(ctx context.Context, info *tap.Info) (context.Context, error) { + // 在真正的 RPC 处理前进行快速拒绝(如黑名单、IP 检查等) + return ctx, nil + }), +) +``` + +未知服务处理与工作池: + +```go +pgrpc.UseOptions( + grpc.UnknownServiceHandler(func(srv any, stream grpc.ServerStream) error { + // 统一记录未注册方法,或返回自定义错误 + return status.Error(codes.Unimplemented, "unknown service/method") + }), + grpc.NumStreamWorkers(8), // 针对 CPU 密集流处理的工作池 +) +``` + +## 更多 Unary 拦截器示例 + +基于 Metadata 的鉴权: + +```go +import ( + "context" + "strings" + "google.golang.org/grpc/metadata" + "google.golang.org/grpc/status" + "google.golang.org/grpc/codes" +) + +func AuthUnaryInterceptor(ctx context.Context, req any, info *grpc.UnaryServerInfo, handler grpc.UnaryHandler) (any, error) { + md, _ := metadata.FromIncomingContext(ctx) + token := "" + if vals := md.Get("authorization"); len(vals) > 0 { + token = vals[0] + } + if token == "" || !strings.HasPrefix(strings.ToLower(token), "bearer ") { + return nil, status.Error(codes.Unauthenticated, "missing or invalid token") + } + // TODO: 验证 JWT / API-Key + return handler(ctx, req) +} + +pgrpc.UseUnaryInterceptors(AuthUnaryInterceptor) +``` + +方法粒度速率限制(x/time/rate): + +```go +import ( + "context" + "sync" + "golang.org/x/time/rate" + "google.golang.org/grpc/status" + "google.golang.org/grpc/codes" +) + +var ( + rlmu sync.RWMutex + rlm = map[string]*rate.Limiter{} +) + +func limitFor(method string) *rate.Limiter { + rlmu.RLock(); l := rlm[method]; rlmu.RUnlock() + if l != nil { return l } + rlmu.Lock(); defer rlmu.Unlock() + if rlm[method] == nil { rlm[method] = rate.NewLimiter(100, 200) } // 100 rps, burst 200 + return rlm[method] +} + +func RateLimitUnaryInterceptor(ctx context.Context, req any, info *grpc.UnaryServerInfo, handler grpc.UnaryHandler) (any, error) { + l := limitFor(info.FullMethod) + if !l.Allow() { + return nil, status.Error(codes.ResourceExhausted, "rate limited") + } + return handler(ctx, req) +} + +pgrpc.UseUnaryInterceptors(RateLimitUnaryInterceptor) +``` + +Request-ID 注入与日志关联: + +```go +import ( + "context" + "github.com/google/uuid" + "google.golang.org/grpc/metadata" +) + +type ctxKey string +const requestIDKey ctxKey = "request_id" + +func RequestIDUnaryInterceptor(ctx context.Context, req any, info *grpc.UnaryServerInfo, handler grpc.UnaryHandler) (any, error) { + md, _ := metadata.FromIncomingContext(ctx) + var rid string + if v := md.Get("x-request-id"); len(v) > 0 { rid = v[0] } + if rid == "" { rid = uuid.New().String() } + ctx = context.WithValue(ctx, requestIDKey, rid) + return handler(ctx, req) +} + +pgrpc.UseUnaryInterceptors(RequestIDUnaryInterceptor) +``` + +无超时/超长请求治理(默认超时/拒绝超长): + +```go +import ( + "context" + "time" + "google.golang.org/grpc/status" + "google.golang.org/grpc/codes" +) + +func DeadlineUnaryInterceptor(max time.Duration) grpc.UnaryServerInterceptor { + return func(ctx context.Context, req any, info *grpc.UnaryServerInfo, handler grpc.UnaryHandler) (any, error) { + if _, ok := ctx.Deadline(); !ok { // 未设置超时 + var cancel context.CancelFunc + ctx, cancel = context.WithTimeout(ctx, max) + defer cancel() + } + resp, err := handler(ctx, req) + if err != nil && ctx.Err() == context.DeadlineExceeded { + return nil, status.Error(codes.DeadlineExceeded, "deadline exceeded") + } + return resp, err + } +} + +pgrpc.UseUnaryInterceptors(DeadlineUnaryInterceptor(5*time.Second)) +``` + +## 更多 Stream 拦截器示例 + +基于 Metadata 的鉴权(流): + +```go +import ( + "google.golang.org/grpc/metadata" + "google.golang.org/grpc/status" + "google.golang.org/grpc/codes" +) + +func AuthStreamInterceptor(srv any, ss grpc.ServerStream, info *grpc.StreamServerInfo, handler grpc.StreamHandler) error { + md, _ := metadata.FromIncomingContext(ss.Context()) + if len(md.Get("authorization")) == 0 { + return status.Error(codes.Unauthenticated, "missing token") + } + return handler(srv, ss) +} + +pgrpc.UseStreamInterceptors(AuthStreamInterceptor) +``` + +流级限流(示例:简单 Allow 检查): + +```go +func RateLimitStreamInterceptor(srv any, ss grpc.ServerStream, info *grpc.StreamServerInfo, handler grpc.StreamHandler) error { + l := limitFor(info.FullMethod) + if !l.Allow() { + return status.Error(codes.ResourceExhausted, "rate limited") + } + return handler(srv, ss) +} + +pgrpc.UseStreamInterceptors(RateLimitStreamInterceptor) +``` + +## 压缩与编码 + +注册 gzip 压缩器后,客户端可按需协商使用(新版本通过 encoding 注册): + +```go +import ( + _ "google.golang.org/grpc/encoding/gzip" // 注册 gzip 编解码器 +) + +// 仅需 import 即可,无额外 ServerOption +``` diff --git a/templates/project/providers/grpc/provider.go.tpl b/templates/project/providers/grpc/provider.go.tpl index b517f76..b76ff40 100644 --- a/templates/project/providers/grpc/provider.go.tpl +++ b/templates/project/providers/grpc/provider.go.tpl @@ -3,8 +3,6 @@ package grpc import ( "go.ipao.vip/atom/container" "go.ipao.vip/atom/opt" - - "google.golang.org/grpc" ) func Provide(opts ...opt.Option) error { @@ -13,15 +11,8 @@ func Provide(opts ...opt.Option) error { if err := o.UnmarshalConfig(&config); err != nil { return err } + return container.Container.Provide(func() (*Grpc, error) { - server := grpc.NewServer() - - grpc := &Grpc{ - Server: server, - config: &config, - } - container.AddCloseAble(grpc.Server.GracefulStop) - - return grpc, nil + return &Grpc{config: &config}, nil }, o.DiOptions()...) }