12 KiB
12 KiB
gRPC Server Options & Interceptors Examples
本文件给出一些可直接拷贝使用的示例,配合本包提供的注册函数:
UseOptions(opts ...grpc.ServerOption)UseUnaryInterceptors(inters ...grpc.UnaryServerInterceptor)UseStreamInterceptors(inters ...grpc.StreamServerInterceptor)
建议在应用启动或 Provider 初始化阶段调用(在 gRPC 服务构造前)。
导入建议:
import ( pgrpc "test/providers/grpc" // 本包 grpc "google.golang.org/grpc" // 避免命名冲突 )
ServerOption 示例
最大消息大小限制:
pgrpc.UseOptions(
grpc.MaxRecvMsgSize(32<<20), // 32 MiB
grpc.MaxSendMsgSize(32<<20), // 32 MiB
)
限制最大并发流(对 HTTP/2 流并发施加上限):
pgrpc.UseOptions(
grpc.MaxConcurrentStreams(1024),
)
Keepalive 参数(需要 keepalive 包):
import (
"time"
"google.golang.org/grpc/keepalive"
)
pgrpc.UseOptions(
grpc.KeepaliveParams(keepalive.ServerParameters{
MaxConnectionIdle: 5 * time.Minute,
MaxConnectionAge: 30 * time.Minute,
MaxConnectionAgeGrace: 5 * time.Minute,
Time: 2 * time.Minute, // ping 间隔
Timeout: 20 * time.Second,
}),
grpc.KeepaliveEnforcementPolicy(keepalive.EnforcementPolicy{
MinTime: 1 * time.Minute, // 客户端 ping 最小间隔
PermitWithoutStream: true,
}),
)
UnaryServerInterceptor 示例
简单日志拦截器(logrus):
import (
"context"
"time"
log "github.com/sirupsen/logrus"
"google.golang.org/grpc"
)
func LoggingUnaryInterceptor(
ctx context.Context,
req any,
info *grpc.UnaryServerInfo,
handler grpc.UnaryHandler,
) (any, error) {
start := time.Now()
resp, err := handler(ctx, req)
dur := time.Since(start)
entry := log.WithFields(log.Fields{
"grpc.method": info.FullMethod,
"grpc.duration_ms": dur.Milliseconds(),
})
if err != nil {
entry.WithError(err).Warn("grpc unary request failed")
} else {
entry.Info("grpc unary request finished")
}
return resp, err
}
// 注册
pgrpc.UseUnaryInterceptors(LoggingUnaryInterceptor)
恢复拦截器(panic 捕获):
import (
"context"
"fmt"
"runtime/debug"
log "github.com/sirupsen/logrus"
"google.golang.org/grpc"
"google.golang.org/grpc/status"
"google.golang.org/grpc/codes"
)
func RecoveryUnaryInterceptor(
ctx context.Context,
req any,
info *grpc.UnaryServerInfo,
handler grpc.UnaryHandler,
) (any, error) {
defer func() {
if r := recover() ; r != nil {
log.WithField("grpc.method", info.FullMethod).Errorf("panic: %v\n%s", r, debug.Stack())
}
}()
return handler(ctx, req)
}
// 或者向客户端返回内部错误:
func RecoveryUnaryInterceptorWithError(
ctx context.Context,
req any,
info *grpc.UnaryServerInfo,
handler grpc.UnaryHandler,
) (any, error) {
defer func() {
if r := recover() ; r != nil {
log.WithField("grpc.method", info.FullMethod).Errorf("panic: %v\n%s", r, debug.Stack())
}
}()
resp, err := handler(ctx, req)
if rec := recover() ; rec != nil {
return nil, status.Error(codes.Internal, fmt.Sprint(rec))
}
return resp, err
}
pgrpc.UseUnaryInterceptors(RecoveryUnaryInterceptor)
链式调用(与其它拦截器共同使用):
pgrpc.UseUnaryInterceptors(LoggingUnaryInterceptor, RecoveryUnaryInterceptor)
StreamServerInterceptor 示例
简单日志拦截器:
import (
"time"
log "github.com/sirupsen/logrus"
"google.golang.org/grpc"
)
func LoggingStreamInterceptor(
srv any,
ss grpc.ServerStream,
info *grpc.StreamServerInfo,
handler grpc.StreamHandler,
) error {
start := time.Now()
err := handler(srv, ss)
dur := time.Since(start)
entry := log.WithFields(log.Fields{
"grpc.method": info.FullMethod,
"grpc.is_client_stream": info.IsClientStream,
"grpc.is_server_stream": info.IsServerStream,
"grpc.duration_ms": dur.Milliseconds(),
})
if err != nil {
entry.WithError(err).Warn("grpc stream request failed")
} else {
entry.Info("grpc stream request finished")
}
return err
}
pgrpc.UseStreamInterceptors(LoggingStreamInterceptor)
恢复拦截器(panic 捕获):
import (
"runtime/debug"
log "github.com/sirupsen/logrus"
"google.golang.org/grpc"
)
func RecoveryStreamInterceptor(
srv any,
ss grpc.ServerStream,
info *grpc.StreamServerInfo,
handler grpc.StreamHandler,
) (err error) {
defer func() {
if r := recover() ; r != nil {
log.WithField("grpc.method", info.FullMethod).Errorf("panic: %v\n%s", r, debug.Stack())
}
}()
return handler(srv, ss)
}
pgrpc.UseStreamInterceptors(RecoveryStreamInterceptor)
组合与测试小贴士
- 可以多次调用
UseOptions/UseUnaryInterceptors/UseStreamInterceptors,最终会在服务构造时链式生效。 - 单元测试中如需隔离,建议使用
pgrpc.Reset()清理已注册的选项和拦截器。 - 若要启用健康检查或反射,请在配置中设置:
EnableHealth = trueEnableReflection = true
更多 ServerOption 示例
TLS(服务端或 mTLS):
import (
"crypto/tls"
grpcCredentials "google.golang.org/grpc/credentials"
)
// 使用自定义 tls.Config(可配置 mTLS)
var tlsConfig *tls.Config = &tls.Config{ /* ... */ }
pgrpc.UseOptions(
grpc.Creds(grpcCredentials.NewTLS(tlsConfig)),
)
// 或者从证书文件加载(仅服务端 TLS)
// pgrpc.UseOptions(grpc.Creds(grpcCredentials.NewServerTLSFromFile(certFile, keyFile)))
OpenTelemetry 统计/追踪(StatsHandler):
import (
otelgrpc "go.opentelemetry.io/contrib/instrumentation/google.golang.org/grpc/otelgrpc"
)
pgrpc.UseOptions(
grpc.StatsHandler(otelgrpc.NewServerHandler()),
)
流控/缓冲区调优:
pgrpc.UseOptions(
grpc.InitialWindowSize(1<<20), // 每个流初始窗口(字节)
grpc.InitialConnWindowSize(1<<21), // 连接级窗口
grpc.ReadBufferSize(64<<10), // 读缓冲 64 KiB
grpc.WriteBufferSize(64<<10), // 写缓冲 64 KiB
)
连接超时与 Tap Handle(早期拦截):
import (
"context"
"time"
"google.golang.org/grpc/tap"
)
pgrpc.UseOptions(
grpc.ConnectionTimeout(5 * time.Second),
grpc.InTapHandle(func(ctx context.Context, info *tap.Info) (context.Context, error) {
// 在真正的 RPC 处理前进行快速拒绝(如黑名单、IP 检查等)
return ctx, nil
}),
)
未知服务处理与工作池:
pgrpc.UseOptions(
grpc.UnknownServiceHandler(func(srv any, stream grpc.ServerStream) error {
// 统一记录未注册方法,或返回自定义错误
return status.Error(codes.Unimplemented, "unknown service/method")
}),
grpc.NumStreamWorkers(8), // 针对 CPU 密集流处理的工作池
)
更多 Unary 拦截器示例
基于 Metadata 的鉴权:
import (
"context"
"strings"
"google.golang.org/grpc/metadata"
"google.golang.org/grpc/status"
"google.golang.org/grpc/codes"
)
func AuthUnaryInterceptor(ctx context.Context, req any, info *grpc.UnaryServerInfo, handler grpc.UnaryHandler) (any, error) {
md, _ := metadata.FromIncomingContext(ctx)
token := ""
if vals := md.Get("authorization"); len(vals) > 0 {
token = vals[0]
}
if token == "" || !strings.HasPrefix(strings.ToLower(token), "bearer ") {
return nil, status.Error(codes.Unauthenticated, "missing or invalid token")
}
// TODO: 验证 JWT / API-Key
return handler(ctx, req)
}
pgrpc.UseUnaryInterceptors(AuthUnaryInterceptor)
方法粒度速率限制(x/time/rate):
import (
"context"
"sync"
"golang.org/x/time/rate"
"google.golang.org/grpc/status"
"google.golang.org/grpc/codes"
)
var (
rlmu sync.RWMutex
rlm = map[string]*rate.Limiter{}
)
func limitFor(method string) *rate.Limiter {
rlmu.RLock() ; l := rlm[method]; rlmu.RUnlock()
if l != nil { return l }
rlmu.Lock() ; defer rlmu.Unlock()
if rlm[method] == nil { rlm[method] = rate.NewLimiter(100, 200) } // 100 rps, burst 200
return rlm[method]
}
func RateLimitUnaryInterceptor(ctx context.Context, req any, info *grpc.UnaryServerInfo, handler grpc.UnaryHandler) (any, error) {
l := limitFor(info.FullMethod)
if !l.Allow() {
return nil, status.Error(codes.ResourceExhausted, "rate limited")
}
return handler(ctx, req)
}
pgrpc.UseUnaryInterceptors(RateLimitUnaryInterceptor)
Request-ID 注入与日志关联:
import (
"context"
"github.com/google/uuid"
"google.golang.org/grpc/metadata"
)
type ctxKey string
const requestIDKey ctxKey = "request_id"
func RequestIDUnaryInterceptor(ctx context.Context, req any, info *grpc.UnaryServerInfo, handler grpc.UnaryHandler) (any, error) {
md, _ := metadata.FromIncomingContext(ctx)
var rid string
if v := md.Get("x-request-id"); len(v) > 0 { rid = v[0] }
if rid == "" { rid = uuid.New().String() }
ctx = context.WithValue(ctx, requestIDKey, rid)
return handler(ctx, req)
}
pgrpc.UseUnaryInterceptors(RequestIDUnaryInterceptor)
无超时/超长请求治理(默认超时/拒绝超长):
import (
"context"
"time"
"google.golang.org/grpc/status"
"google.golang.org/grpc/codes"
)
func DeadlineUnaryInterceptor(max time.Duration) grpc.UnaryServerInterceptor {
return func(ctx context.Context, req any, info *grpc.UnaryServerInfo, handler grpc.UnaryHandler) (any, error) {
if _, ok := ctx.Deadline() ; !ok { // 未设置超时
var cancel context.CancelFunc
ctx, cancel = context.WithTimeout(ctx, max)
defer cancel()
}
resp, err := handler(ctx, req)
if err != nil && ctx.Err() == context.DeadlineExceeded {
return nil, status.Error(codes.DeadlineExceeded, "deadline exceeded")
}
return resp, err
}
}
pgrpc.UseUnaryInterceptors(DeadlineUnaryInterceptor(5*time.Second))
更多 Stream 拦截器示例
基于 Metadata 的鉴权(流):
import (
"google.golang.org/grpc/metadata"
"google.golang.org/grpc/status"
"google.golang.org/grpc/codes"
)
func AuthStreamInterceptor(srv any, ss grpc.ServerStream, info *grpc.StreamServerInfo, handler grpc.StreamHandler) error {
md, _ := metadata.FromIncomingContext(ss.Context())
if len(md.Get("authorization")) == 0 {
return status.Error(codes.Unauthenticated, "missing token")
}
return handler(srv, ss)
}
pgrpc.UseStreamInterceptors(AuthStreamInterceptor)
流级限流(示例:简单 Allow 检查):
func RateLimitStreamInterceptor(srv any, ss grpc.ServerStream, info *grpc.StreamServerInfo, handler grpc.StreamHandler) error {
l := limitFor(info.FullMethod)
if !l.Allow() {
return status.Error(codes.ResourceExhausted, "rate limited")
}
return handler(srv, ss)
}
pgrpc.UseStreamInterceptors(RateLimitStreamInterceptor)
压缩与编码
注册 gzip 压缩器后,客户端可按需协商使用(新版本通过 encoding 注册):
import (
_ "google.golang.org/grpc/encoding/gzip" // 注册 gzip 编解码器
)
// 仅需 import 即可,无额外 ServerOption
OpenTelemetry 集成(推荐)
使用 StatsHandler(推荐,不与拦截器同时使用,避免重复埋点):
import (
otelgrpc "go.opentelemetry.io/contrib/instrumentation/google.golang.org/grpc/otelgrpc"
)
// 基本接入:使用全局 Tracer/Meter(由 OTEL Provider 初始化)
handler := otelgrpc.NewServerHandler(
otelgrpc.WithTraceEvents(), // 在 span 中记录消息事件
)
pgrpc.UseOptions(grpc.StatsHandler(handler))
// 忽略某些方法(如健康检查),避免噪声:
handler = otelgrpc.NewServerHandler(
otelgrpc.WithFilter(func(ctx context.Context, fullMethod string) bool {
return fullMethod != "/grpc.health.v1.Health/Check"
}),
)
pgrpc.UseOptions(grpc.StatsHandler(handler))
使用拦截器版本(如你更偏好 Interceptor 方案;与 StatsHandler 二选一):
import (
otelgrpc "go.opentelemetry.io/contrib/instrumentation/google.golang.org/grpc/otelgrpc"
)
pgrpc.UseUnaryInterceptors(otelgrpc.UnaryServerInterceptor())
pgrpc.UseStreamInterceptors(otelgrpc.StreamServerInterceptor())
注意:不要同时启用 StatsHandler 和拦截器,否则会重复生成 span/metrics。
OpenTracing(Jaeger)集成
当使用 Tracing Provider(Jaeger + OpenTracing)时,可使用 opentracing 的 gRPC 拦截器:
import (
opentracing "github.com/opentracing/opentracing-go"
otgrpc "github.com/grpc-ecosystem/grpc-opentracing/go/otgrpc"
)
pgrpc.UseUnaryInterceptors(otgrpc.OpenTracingServerInterceptor(opentracing.GlobalTracer()))
pgrpc.UseStreamInterceptors(otgrpc.OpenTracingStreamServerInterceptor(opentracing.GlobalTracer()))
与 OTEL 方案互斥:如果已启用 OTEL,请不要再开启 OpenTracing 拦截器,以免重复埋点。