Files
atomctl/templates/project/providers/grpc/options.md.raw

461 lines
11 KiB
Plaintext
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
# gRPC Server Options & Interceptors Examples
本文件给出一些可直接拷贝使用的示例,配合本包提供的注册函数:
- `UseOptions(opts ...grpc.ServerOption)`
- `UseUnaryInterceptors(inters ...grpc.UnaryServerInterceptor)`
- `UseStreamInterceptors(inters ...grpc.StreamServerInterceptor)`
建议在应用启动或 Provider 初始化阶段调用(在 gRPC 服务构造前)。
> 导入建议:
>
> ```go
> import (
> pgrpc "test/providers/grpc" // 本包
> grpc "google.golang.org/grpc" // 避免命名冲突
> )
> ```
## ServerOption 示例
最大消息大小限制:
```go
pgrpc.UseOptions(
grpc.MaxRecvMsgSize(32<<20), // 32 MiB
grpc.MaxSendMsgSize(32<<20), // 32 MiB
)
```
限制最大并发流(对 HTTP/2 流并发施加上限):
```go
pgrpc.UseOptions(
grpc.MaxConcurrentStreams(1024),
)
```
Keepalive 参数(需要 keepalive 包):
```go
import (
"time"
"google.golang.org/grpc/keepalive"
)
pgrpc.UseOptions(
grpc.KeepaliveParams(keepalive.ServerParameters{
MaxConnectionIdle: 5 * time.Minute,
MaxConnectionAge: 30 * time.Minute,
MaxConnectionAgeGrace: 5 * time.Minute,
Time: 2 * time.Minute, // ping 间隔
Timeout: 20 * time.Second,
}),
grpc.KeepaliveEnforcementPolicy(keepalive.EnforcementPolicy{
MinTime: 1 * time.Minute, // 客户端 ping 最小间隔
PermitWithoutStream: true,
}),
)
```
## UnaryServerInterceptor 示例
简单日志拦截器logrus
```go
import (
"context"
"time"
log "github.com/sirupsen/logrus"
"google.golang.org/grpc"
)
func LoggingUnaryInterceptor(
ctx context.Context,
req any,
info *grpc.UnaryServerInfo,
handler grpc.UnaryHandler,
) (any, error) {
start := time.Now()
resp, err := handler(ctx, req)
dur := time.Since(start)
entry := log.WithFields(log.Fields{
"grpc.method": info.FullMethod,
"grpc.duration_ms": dur.Milliseconds(),
})
if err != nil {
entry.WithError(err).Warn("grpc unary request failed")
} else {
entry.Info("grpc unary request finished")
}
return resp, err
}
// 注册
pgrpc.UseUnaryInterceptors(LoggingUnaryInterceptor)
```
恢复拦截器panic 捕获):
```go
import (
"context"
"fmt"
"runtime/debug"
log "github.com/sirupsen/logrus"
"google.golang.org/grpc"
"google.golang.org/grpc/status"
"google.golang.org/grpc/codes"
)
func RecoveryUnaryInterceptor(
ctx context.Context,
req any,
info *grpc.UnaryServerInfo,
handler grpc.UnaryHandler,
) (any, error) {
defer func() {
if r := recover(); r != nil {
log.WithField("grpc.method", info.FullMethod).Errorf("panic: %v\n%s", r, debug.Stack())
}
}()
return handler(ctx, req)
}
// 或者向客户端返回内部错误:
func RecoveryUnaryInterceptorWithError(
ctx context.Context,
req any,
info *grpc.UnaryServerInfo,
handler grpc.UnaryHandler,
) (any, error) {
defer func() {
if r := recover(); r != nil {
log.WithField("grpc.method", info.FullMethod).Errorf("panic: %v\n%s", r, debug.Stack())
}
}()
resp, err := handler(ctx, req)
if rec := recover(); rec != nil {
return nil, status.Error(codes.Internal, fmt.Sprint(rec))
}
return resp, err
}
pgrpc.UseUnaryInterceptors(RecoveryUnaryInterceptor)
```
链式调用(与其它拦截器共同使用):
```go
pgrpc.UseUnaryInterceptors(LoggingUnaryInterceptor, RecoveryUnaryInterceptor)
```
## StreamServerInterceptor 示例
简单日志拦截器:
```go
import (
"time"
log "github.com/sirupsen/logrus"
"google.golang.org/grpc"
)
func LoggingStreamInterceptor(
srv any,
ss grpc.ServerStream,
info *grpc.StreamServerInfo,
handler grpc.StreamHandler,
) error {
start := time.Now()
err := handler(srv, ss)
dur := time.Since(start)
entry := log.WithFields(log.Fields{
"grpc.method": info.FullMethod,
"grpc.is_client_stream": info.IsClientStream,
"grpc.is_server_stream": info.IsServerStream,
"grpc.duration_ms": dur.Milliseconds(),
})
if err != nil {
entry.WithError(err).Warn("grpc stream request failed")
} else {
entry.Info("grpc stream request finished")
}
return err
}
pgrpc.UseStreamInterceptors(LoggingStreamInterceptor)
```
恢复拦截器panic 捕获):
```go
import (
"runtime/debug"
log "github.com/sirupsen/logrus"
"google.golang.org/grpc"
)
func RecoveryStreamInterceptor(
srv any,
ss grpc.ServerStream,
info *grpc.StreamServerInfo,
handler grpc.StreamHandler,
) (err error) {
defer func() {
if r := recover(); r != nil {
log.WithField("grpc.method", info.FullMethod).Errorf("panic: %v\n%s", r, debug.Stack())
}
}()
return handler(srv, ss)
}
pgrpc.UseStreamInterceptors(RecoveryStreamInterceptor)
```
## 组合与测试小贴士
- 可以多次调用 `UseOptions/UseUnaryInterceptors/UseStreamInterceptors`,最终会在服务构造时链式生效。
- 单元测试中如需隔离,建议使用 `pgrpc.Reset()` 清理已注册的选项和拦截器。
- 若要启用健康检查或反射,请在配置中设置:
- `EnableHealth = true`
- `EnableReflection = true`
## 更多 ServerOption 示例
TLS服务端或 mTLS
```go
import (
"crypto/tls"
grpcCredentials "google.golang.org/grpc/credentials"
)
// 使用自定义 tls.Config可配置 mTLS
var tlsConfig *tls.Config = &tls.Config{ /* ... */ }
pgrpc.UseOptions(
grpc.Creds(grpcCredentials.NewTLS(tlsConfig)),
)
// 或者从证书文件加载(仅服务端 TLS
// pgrpc.UseOptions(grpc.Creds(grpcCredentials.NewServerTLSFromFile(certFile, keyFile)))
```
OpenTelemetry 统计/追踪StatsHandler
```go
import (
otelgrpc "go.opentelemetry.io/contrib/instrumentation/google.golang.org/grpc/otelgrpc"
)
pgrpc.UseOptions(
grpc.StatsHandler(otelgrpc.NewServerHandler()),
)
```
流控/缓冲区调优:
```go
pgrpc.UseOptions(
grpc.InitialWindowSize(1<<20), // 每个流初始窗口(字节)
grpc.InitialConnWindowSize(1<<21), // 连接级窗口
grpc.ReadBufferSize(64<<10), // 读缓冲 64 KiB
grpc.WriteBufferSize(64<<10), // 写缓冲 64 KiB
)
```
连接超时与 Tap Handle早期拦截
```go
import (
"context"
"time"
"google.golang.org/grpc/tap"
)
pgrpc.UseOptions(
grpc.ConnectionTimeout(5 * time.Second),
grpc.InTapHandle(func(ctx context.Context, info *tap.Info) (context.Context, error) {
// 在真正的 RPC 处理前进行快速拒绝如黑名单、IP 检查等)
return ctx, nil
}),
)
```
未知服务处理与工作池:
```go
pgrpc.UseOptions(
grpc.UnknownServiceHandler(func(srv any, stream grpc.ServerStream) error {
// 统一记录未注册方法,或返回自定义错误
return status.Error(codes.Unimplemented, "unknown service/method")
}),
grpc.NumStreamWorkers(8), // 针对 CPU 密集流处理的工作池
)
```
## 更多 Unary 拦截器示例
基于 Metadata 的鉴权:
```go
import (
"context"
"strings"
"google.golang.org/grpc/metadata"
"google.golang.org/grpc/status"
"google.golang.org/grpc/codes"
)
func AuthUnaryInterceptor(ctx context.Context, req any, info *grpc.UnaryServerInfo, handler grpc.UnaryHandler) (any, error) {
md, _ := metadata.FromIncomingContext(ctx)
token := ""
if vals := md.Get("authorization"); len(vals) > 0 {
token = vals[0]
}
if token == "" || !strings.HasPrefix(strings.ToLower(token), "bearer ") {
return nil, status.Error(codes.Unauthenticated, "missing or invalid token")
}
// TODO: 验证 JWT / API-Key
return handler(ctx, req)
}
pgrpc.UseUnaryInterceptors(AuthUnaryInterceptor)
```
方法粒度速率限制x/time/rate
```go
import (
"context"
"sync"
"golang.org/x/time/rate"
"google.golang.org/grpc/status"
"google.golang.org/grpc/codes"
)
var (
rlmu sync.RWMutex
rlm = map[string]*rate.Limiter{}
)
func limitFor(method string) *rate.Limiter {
rlmu.RLock(); l := rlm[method]; rlmu.RUnlock()
if l != nil { return l }
rlmu.Lock(); defer rlmu.Unlock()
if rlm[method] == nil { rlm[method] = rate.NewLimiter(100, 200) } // 100 rps, burst 200
return rlm[method]
}
func RateLimitUnaryInterceptor(ctx context.Context, req any, info *grpc.UnaryServerInfo, handler grpc.UnaryHandler) (any, error) {
l := limitFor(info.FullMethod)
if !l.Allow() {
return nil, status.Error(codes.ResourceExhausted, "rate limited")
}
return handler(ctx, req)
}
pgrpc.UseUnaryInterceptors(RateLimitUnaryInterceptor)
```
Request-ID 注入与日志关联:
```go
import (
"context"
"github.com/google/uuid"
"google.golang.org/grpc/metadata"
)
type ctxKey string
const requestIDKey ctxKey = "request_id"
func RequestIDUnaryInterceptor(ctx context.Context, req any, info *grpc.UnaryServerInfo, handler grpc.UnaryHandler) (any, error) {
md, _ := metadata.FromIncomingContext(ctx)
var rid string
if v := md.Get("x-request-id"); len(v) > 0 { rid = v[0] }
if rid == "" { rid = uuid.New().String() }
ctx = context.WithValue(ctx, requestIDKey, rid)
return handler(ctx, req)
}
pgrpc.UseUnaryInterceptors(RequestIDUnaryInterceptor)
```
无超时/超长请求治理(默认超时/拒绝超长):
```go
import (
"context"
"time"
"google.golang.org/grpc/status"
"google.golang.org/grpc/codes"
)
func DeadlineUnaryInterceptor(max time.Duration) grpc.UnaryServerInterceptor {
return func(ctx context.Context, req any, info *grpc.UnaryServerInfo, handler grpc.UnaryHandler) (any, error) {
if _, ok := ctx.Deadline(); !ok { // 未设置超时
var cancel context.CancelFunc
ctx, cancel = context.WithTimeout(ctx, max)
defer cancel()
}
resp, err := handler(ctx, req)
if err != nil && ctx.Err() == context.DeadlineExceeded {
return nil, status.Error(codes.DeadlineExceeded, "deadline exceeded")
}
return resp, err
}
}
pgrpc.UseUnaryInterceptors(DeadlineUnaryInterceptor(5*time.Second))
```
## 更多 Stream 拦截器示例
基于 Metadata 的鉴权(流):
```go
import (
"google.golang.org/grpc/metadata"
"google.golang.org/grpc/status"
"google.golang.org/grpc/codes"
)
func AuthStreamInterceptor(srv any, ss grpc.ServerStream, info *grpc.StreamServerInfo, handler grpc.StreamHandler) error {
md, _ := metadata.FromIncomingContext(ss.Context())
if len(md.Get("authorization")) == 0 {
return status.Error(codes.Unauthenticated, "missing token")
}
return handler(srv, ss)
}
pgrpc.UseStreamInterceptors(AuthStreamInterceptor)
```
流级限流(示例:简单 Allow 检查):
```go
func RateLimitStreamInterceptor(srv any, ss grpc.ServerStream, info *grpc.StreamServerInfo, handler grpc.StreamHandler) error {
l := limitFor(info.FullMethod)
if !l.Allow() {
return status.Error(codes.ResourceExhausted, "rate limited")
}
return handler(srv, ss)
}
pgrpc.UseStreamInterceptors(RateLimitStreamInterceptor)
```
## 压缩与编码
注册 gzip 压缩器后,客户端可按需协商使用(新版本通过 encoding 注册):
```go
import (
_ "google.golang.org/grpc/encoding/gzip" // 注册 gzip 编解码器
)
// 仅需 import 即可,无额外 ServerOption
```