# gRPC Server Options & Interceptors Examples 本文件给出一些可直接拷贝使用的示例,配合本包提供的注册函数: - `UseOptions(opts ...grpc.ServerOption)` - `UseUnaryInterceptors(inters ...grpc.UnaryServerInterceptor)` - `UseStreamInterceptors(inters ...grpc.StreamServerInterceptor)` 建议在应用启动或 Provider 初始化阶段调用(在 gRPC 服务构造前)。 > 导入建议: > > ```go > import ( > pgrpc "test/providers/grpc" // 本包 > grpc "google.golang.org/grpc" // 避免命名冲突 > ) > ``` ## ServerOption 示例 最大消息大小限制: ```go pgrpc.UseOptions( grpc.MaxRecvMsgSize(32<<20), // 32 MiB grpc.MaxSendMsgSize(32<<20), // 32 MiB ) ``` 限制最大并发流(对 HTTP/2 流并发施加上限): ```go pgrpc.UseOptions( grpc.MaxConcurrentStreams(1024), ) ``` Keepalive 参数(需要 keepalive 包): ```go import ( "time" "google.golang.org/grpc/keepalive" ) pgrpc.UseOptions( grpc.KeepaliveParams(keepalive.ServerParameters{ MaxConnectionIdle: 5 * time.Minute, MaxConnectionAge: 30 * time.Minute, MaxConnectionAgeGrace: 5 * time.Minute, Time: 2 * time.Minute, // ping 间隔 Timeout: 20 * time.Second, }), grpc.KeepaliveEnforcementPolicy(keepalive.EnforcementPolicy{ MinTime: 1 * time.Minute, // 客户端 ping 最小间隔 PermitWithoutStream: true, }), ) ``` ## UnaryServerInterceptor 示例 简单日志拦截器(logrus): ```go import ( "context" "time" log "github.com/sirupsen/logrus" "google.golang.org/grpc" ) func LoggingUnaryInterceptor( ctx context.Context, req any, info *grpc.UnaryServerInfo, handler grpc.UnaryHandler, ) (any, error) { start := time.Now() resp, err := handler(ctx, req) dur := time.Since(start) entry := log.WithFields(log.Fields{ "grpc.method": info.FullMethod, "grpc.duration_ms": dur.Milliseconds(), }) if err != nil { entry.WithError(err).Warn("grpc unary request failed") } else { entry.Info("grpc unary request finished") } return resp, err } // 注册 pgrpc.UseUnaryInterceptors(LoggingUnaryInterceptor) ``` 恢复拦截器(panic 捕获): ```go import ( "context" "fmt" "runtime/debug" log "github.com/sirupsen/logrus" "google.golang.org/grpc" "google.golang.org/grpc/status" "google.golang.org/grpc/codes" ) func RecoveryUnaryInterceptor( ctx context.Context, req any, info *grpc.UnaryServerInfo, handler grpc.UnaryHandler, ) (any, error) { defer func() { if r := recover() ; r != nil { log.WithField("grpc.method", info.FullMethod).Errorf("panic: %v\n%s", r, debug.Stack()) } }() return handler(ctx, req) } // 或者向客户端返回内部错误: func RecoveryUnaryInterceptorWithError( ctx context.Context, req any, info *grpc.UnaryServerInfo, handler grpc.UnaryHandler, ) (any, error) { defer func() { if r := recover() ; r != nil { log.WithField("grpc.method", info.FullMethod).Errorf("panic: %v\n%s", r, debug.Stack()) } }() resp, err := handler(ctx, req) if rec := recover() ; rec != nil { return nil, status.Error(codes.Internal, fmt.Sprint(rec)) } return resp, err } pgrpc.UseUnaryInterceptors(RecoveryUnaryInterceptor) ``` 链式调用(与其它拦截器共同使用): ```go pgrpc.UseUnaryInterceptors(LoggingUnaryInterceptor, RecoveryUnaryInterceptor) ``` ## StreamServerInterceptor 示例 简单日志拦截器: ```go import ( "time" log "github.com/sirupsen/logrus" "google.golang.org/grpc" ) func LoggingStreamInterceptor( srv any, ss grpc.ServerStream, info *grpc.StreamServerInfo, handler grpc.StreamHandler, ) error { start := time.Now() err := handler(srv, ss) dur := time.Since(start) entry := log.WithFields(log.Fields{ "grpc.method": info.FullMethod, "grpc.is_client_stream": info.IsClientStream, "grpc.is_server_stream": info.IsServerStream, "grpc.duration_ms": dur.Milliseconds(), }) if err != nil { entry.WithError(err).Warn("grpc stream request failed") } else { entry.Info("grpc stream request finished") } return err } pgrpc.UseStreamInterceptors(LoggingStreamInterceptor) ``` 恢复拦截器(panic 捕获): ```go import ( "runtime/debug" log "github.com/sirupsen/logrus" "google.golang.org/grpc" ) func RecoveryStreamInterceptor( srv any, ss grpc.ServerStream, info *grpc.StreamServerInfo, handler grpc.StreamHandler, ) (err error) { defer func() { if r := recover() ; r != nil { log.WithField("grpc.method", info.FullMethod).Errorf("panic: %v\n%s", r, debug.Stack()) } }() return handler(srv, ss) } pgrpc.UseStreamInterceptors(RecoveryStreamInterceptor) ``` ## 组合与测试小贴士 - 可以多次调用 `UseOptions/UseUnaryInterceptors/UseStreamInterceptors`,最终会在服务构造时链式生效。 - 单元测试中如需隔离,建议使用 `pgrpc.Reset()` 清理已注册的选项和拦截器。 - 若要启用健康检查或反射,请在配置中设置: - `EnableHealth = true` - `EnableReflection = true` ## 更多 ServerOption 示例 TLS(服务端或 mTLS): ```go import ( "crypto/tls" grpcCredentials "google.golang.org/grpc/credentials" ) // 使用自定义 tls.Config(可配置 mTLS) var tlsConfig *tls.Config = &tls.Config{ /* ... */ } pgrpc.UseOptions( grpc.Creds(grpcCredentials.NewTLS(tlsConfig)), ) // 或者从证书文件加载(仅服务端 TLS) // pgrpc.UseOptions(grpc.Creds(grpcCredentials.NewServerTLSFromFile(certFile, keyFile))) ``` OpenTelemetry 统计/追踪(StatsHandler): ```go import ( otelgrpc "go.opentelemetry.io/contrib/instrumentation/google.golang.org/grpc/otelgrpc" ) pgrpc.UseOptions( grpc.StatsHandler(otelgrpc.NewServerHandler()), ) ``` 流控/缓冲区调优: ```go pgrpc.UseOptions( grpc.InitialWindowSize(1<<20), // 每个流初始窗口(字节) grpc.InitialConnWindowSize(1<<21), // 连接级窗口 grpc.ReadBufferSize(64<<10), // 读缓冲 64 KiB grpc.WriteBufferSize(64<<10), // 写缓冲 64 KiB ) ``` 连接超时与 Tap Handle(早期拦截): ```go import ( "context" "time" "google.golang.org/grpc/tap" ) pgrpc.UseOptions( grpc.ConnectionTimeout(5 * time.Second), grpc.InTapHandle(func(ctx context.Context, info *tap.Info) (context.Context, error) { // 在真正的 RPC 处理前进行快速拒绝(如黑名单、IP 检查等) return ctx, nil }), ) ``` 未知服务处理与工作池: ```go pgrpc.UseOptions( grpc.UnknownServiceHandler(func(srv any, stream grpc.ServerStream) error { // 统一记录未注册方法,或返回自定义错误 return status.Error(codes.Unimplemented, "unknown service/method") }), grpc.NumStreamWorkers(8), // 针对 CPU 密集流处理的工作池 ) ``` ## 更多 Unary 拦截器示例 基于 Metadata 的鉴权: ```go import ( "context" "strings" "google.golang.org/grpc/metadata" "google.golang.org/grpc/status" "google.golang.org/grpc/codes" ) func AuthUnaryInterceptor(ctx context.Context, req any, info *grpc.UnaryServerInfo, handler grpc.UnaryHandler) (any, error) { md, _ := metadata.FromIncomingContext(ctx) token := "" if vals := md.Get("authorization"); len(vals) > 0 { token = vals[0] } if token == "" || !strings.HasPrefix(strings.ToLower(token), "bearer ") { return nil, status.Error(codes.Unauthenticated, "missing or invalid token") } // TODO: 验证 JWT / API-Key return handler(ctx, req) } pgrpc.UseUnaryInterceptors(AuthUnaryInterceptor) ``` 方法粒度速率限制(x/time/rate): ```go import ( "context" "sync" "golang.org/x/time/rate" "google.golang.org/grpc/status" "google.golang.org/grpc/codes" ) var ( rlmu sync.RWMutex rlm = map[string]*rate.Limiter{} ) func limitFor(method string) *rate.Limiter { rlmu.RLock() ; l := rlm[method]; rlmu.RUnlock() if l != nil { return l } rlmu.Lock() ; defer rlmu.Unlock() if rlm[method] == nil { rlm[method] = rate.NewLimiter(100, 200) } // 100 rps, burst 200 return rlm[method] } func RateLimitUnaryInterceptor(ctx context.Context, req any, info *grpc.UnaryServerInfo, handler grpc.UnaryHandler) (any, error) { l := limitFor(info.FullMethod) if !l.Allow() { return nil, status.Error(codes.ResourceExhausted, "rate limited") } return handler(ctx, req) } pgrpc.UseUnaryInterceptors(RateLimitUnaryInterceptor) ``` Request-ID 注入与日志关联: ```go import ( "context" "github.com/google/uuid" "google.golang.org/grpc/metadata" ) type ctxKey string const requestIDKey ctxKey = "request_id" func RequestIDUnaryInterceptor(ctx context.Context, req any, info *grpc.UnaryServerInfo, handler grpc.UnaryHandler) (any, error) { md, _ := metadata.FromIncomingContext(ctx) var rid string if v := md.Get("x-request-id"); len(v) > 0 { rid = v[0] } if rid == "" { rid = uuid.New().String() } ctx = context.WithValue(ctx, requestIDKey, rid) return handler(ctx, req) } pgrpc.UseUnaryInterceptors(RequestIDUnaryInterceptor) ``` 无超时/超长请求治理(默认超时/拒绝超长): ```go import ( "context" "time" "google.golang.org/grpc/status" "google.golang.org/grpc/codes" ) func DeadlineUnaryInterceptor(max time.Duration) grpc.UnaryServerInterceptor { return func(ctx context.Context, req any, info *grpc.UnaryServerInfo, handler grpc.UnaryHandler) (any, error) { if _, ok := ctx.Deadline() ; !ok { // 未设置超时 var cancel context.CancelFunc ctx, cancel = context.WithTimeout(ctx, max) defer cancel() } resp, err := handler(ctx, req) if err != nil && ctx.Err() == context.DeadlineExceeded { return nil, status.Error(codes.DeadlineExceeded, "deadline exceeded") } return resp, err } } pgrpc.UseUnaryInterceptors(DeadlineUnaryInterceptor(5*time.Second)) ``` ## 更多 Stream 拦截器示例 基于 Metadata 的鉴权(流): ```go import ( "google.golang.org/grpc/metadata" "google.golang.org/grpc/status" "google.golang.org/grpc/codes" ) func AuthStreamInterceptor(srv any, ss grpc.ServerStream, info *grpc.StreamServerInfo, handler grpc.StreamHandler) error { md, _ := metadata.FromIncomingContext(ss.Context()) if len(md.Get("authorization")) == 0 { return status.Error(codes.Unauthenticated, "missing token") } return handler(srv, ss) } pgrpc.UseStreamInterceptors(AuthStreamInterceptor) ``` 流级限流(示例:简单 Allow 检查): ```go func RateLimitStreamInterceptor(srv any, ss grpc.ServerStream, info *grpc.StreamServerInfo, handler grpc.StreamHandler) error { l := limitFor(info.FullMethod) if !l.Allow() { return status.Error(codes.ResourceExhausted, "rate limited") } return handler(srv, ss) } pgrpc.UseStreamInterceptors(RateLimitStreamInterceptor) ``` ## 压缩与编码 注册 gzip 压缩器后,客户端可按需协商使用(新版本通过 encoding 注册): ```go import ( _ "google.golang.org/grpc/encoding/gzip" // 注册 gzip 编解码器 ) // 仅需 import 即可,无额外 ServerOption ``` ## OpenTelemetry 集成(推荐) 使用 StatsHandler(推荐,不与拦截器同时使用,避免重复埋点): ```go import ( otelgrpc "go.opentelemetry.io/contrib/instrumentation/google.golang.org/grpc/otelgrpc" ) // 基本接入:使用全局 Tracer/Meter(由 OTEL Provider 初始化) handler := otelgrpc.NewServerHandler( otelgrpc.WithTraceEvents(), // 在 span 中记录消息事件 ) pgrpc.UseOptions(grpc.StatsHandler(handler)) // 忽略某些方法(如健康检查),避免噪声: handler = otelgrpc.NewServerHandler( otelgrpc.WithFilter(func(ctx context.Context, fullMethod string) bool { return fullMethod != "/grpc.health.v1.Health/Check" }), ) pgrpc.UseOptions(grpc.StatsHandler(handler)) ``` 使用拦截器版本(如你更偏好 Interceptor 方案;与 StatsHandler 二选一): ```go import ( otelgrpc "go.opentelemetry.io/contrib/instrumentation/google.golang.org/grpc/otelgrpc" ) pgrpc.UseUnaryInterceptors(otelgrpc.UnaryServerInterceptor()) pgrpc.UseStreamInterceptors(otelgrpc.StreamServerInterceptor()) ``` > 注意:不要同时启用 StatsHandler 和拦截器,否则会重复生成 span/metrics。 ## OpenTracing(Jaeger)集成 当使用 Tracing Provider(Jaeger + OpenTracing)时,可使用 opentracing 的 gRPC 拦截器: ```go import ( opentracing "github.com/opentracing/opentracing-go" otgrpc "github.com/grpc-ecosystem/grpc-opentracing/go/otgrpc" ) pgrpc.UseUnaryInterceptors(otgrpc.OpenTracingServerInterceptor(opentracing.GlobalTracer())) pgrpc.UseStreamInterceptors(otgrpc.OpenTracingStreamServerInterceptor(opentracing.GlobalTracer())) ``` > 与 OTEL 方案互斥:如果已启用 OTEL,请不要再开启 OpenTracing 拦截器,以免重复埋点。