diff --git a/templates/project/providers/grpc/options.md.raw b/templates/project/providers/grpc/options.md.raw index 99bb6af..c721bbd 100644 --- a/templates/project/providers/grpc/options.md.raw +++ b/templates/project/providers/grpc/options.md.raw @@ -12,8 +12,8 @@ > > ```go > import ( -> pgrpc "test/providers/grpc" // 本包 -> grpc "google.golang.org/grpc" // 避免命名冲突 +> pgrpc "test/providers/grpc" // 本包 +> grpc "google.golang.org/grpc" // 避免命名冲突 > ) > ``` @@ -23,8 +23,8 @@ ```go pgrpc.UseOptions( - grpc.MaxRecvMsgSize(32<<20), // 32 MiB - grpc.MaxSendMsgSize(32<<20), // 32 MiB +grpc.MaxRecvMsgSize(32<<20), // 32 MiB +grpc.MaxSendMsgSize(32<<20), // 32 MiB ) ``` @@ -32,7 +32,7 @@ pgrpc.UseOptions( ```go pgrpc.UseOptions( - grpc.MaxConcurrentStreams(1024), +grpc.MaxConcurrentStreams(1024), ) ``` @@ -40,22 +40,22 @@ Keepalive 参数(需要 keepalive 包): ```go import ( - "time" - "google.golang.org/grpc/keepalive" +"time" +"google.golang.org/grpc/keepalive" ) pgrpc.UseOptions( - grpc.KeepaliveParams(keepalive.ServerParameters{ - MaxConnectionIdle: 5 * time.Minute, - MaxConnectionAge: 30 * time.Minute, - MaxConnectionAgeGrace: 5 * time.Minute, - Time: 2 * time.Minute, // ping 间隔 - Timeout: 20 * time.Second, - }), - grpc.KeepaliveEnforcementPolicy(keepalive.EnforcementPolicy{ - MinTime: 1 * time.Minute, // 客户端 ping 最小间隔 - PermitWithoutStream: true, - }), +grpc.KeepaliveParams(keepalive.ServerParameters{ +MaxConnectionIdle: 5 * time.Minute, +MaxConnectionAge: 30 * time.Minute, +MaxConnectionAgeGrace: 5 * time.Minute, +Time: 2 * time.Minute, // ping 间隔 +Timeout: 20 * time.Second, +}), +grpc.KeepaliveEnforcementPolicy(keepalive.EnforcementPolicy{ +MinTime: 1 * time.Minute, // 客户端 ping 最小间隔 +PermitWithoutStream: true, +}), ) ``` @@ -65,31 +65,31 @@ pgrpc.UseOptions( ```go import ( - "context" - "time" - log "github.com/sirupsen/logrus" - "google.golang.org/grpc" +"context" +"time" +log "github.com/sirupsen/logrus" +"google.golang.org/grpc" ) func LoggingUnaryInterceptor( - ctx context.Context, - req any, - info *grpc.UnaryServerInfo, - handler grpc.UnaryHandler, +ctx context.Context, +req any, +info *grpc.UnaryServerInfo, +handler grpc.UnaryHandler, ) (any, error) { - start := time.Now() - resp, err := handler(ctx, req) - dur := time.Since(start) - entry := log.WithFields(log.Fields{ - "grpc.method": info.FullMethod, - "grpc.duration_ms": dur.Milliseconds(), - }) - if err != nil { - entry.WithError(err).Warn("grpc unary request failed") - } else { - entry.Info("grpc unary request finished") - } - return resp, err +start := time.Now() +resp, err := handler(ctx, req) +dur := time.Since(start) +entry := log.WithFields(log.Fields{ +"grpc.method": info.FullMethod, +"grpc.duration_ms": dur.Milliseconds(), +}) +if err != nil { +entry.WithError(err).Warn("grpc unary request failed") +} else { +entry.Info("grpc unary request finished") +} +return resp, err } // 注册 @@ -100,46 +100,46 @@ pgrpc.UseUnaryInterceptors(LoggingUnaryInterceptor) ```go import ( - "context" - "fmt" - "runtime/debug" - log "github.com/sirupsen/logrus" - "google.golang.org/grpc" - "google.golang.org/grpc/status" - "google.golang.org/grpc/codes" +"context" +"fmt" +"runtime/debug" +log "github.com/sirupsen/logrus" +"google.golang.org/grpc" +"google.golang.org/grpc/status" +"google.golang.org/grpc/codes" ) func RecoveryUnaryInterceptor( - ctx context.Context, - req any, - info *grpc.UnaryServerInfo, - handler grpc.UnaryHandler, +ctx context.Context, +req any, +info *grpc.UnaryServerInfo, +handler grpc.UnaryHandler, ) (any, error) { - defer func() { - if r := recover(); r != nil { - log.WithField("grpc.method", info.FullMethod).Errorf("panic: %v\n%s", r, debug.Stack()) - } - }() - return handler(ctx, req) +defer func() { +if r := recover() ; r != nil { +log.WithField("grpc.method", info.FullMethod).Errorf("panic: %v\n%s", r, debug.Stack()) +} +}() +return handler(ctx, req) } // 或者向客户端返回内部错误: func RecoveryUnaryInterceptorWithError( - ctx context.Context, - req any, - info *grpc.UnaryServerInfo, - handler grpc.UnaryHandler, +ctx context.Context, +req any, +info *grpc.UnaryServerInfo, +handler grpc.UnaryHandler, ) (any, error) { - defer func() { - if r := recover(); r != nil { - log.WithField("grpc.method", info.FullMethod).Errorf("panic: %v\n%s", r, debug.Stack()) - } - }() - resp, err := handler(ctx, req) - if rec := recover(); rec != nil { - return nil, status.Error(codes.Internal, fmt.Sprint(rec)) - } - return resp, err +defer func() { +if r := recover() ; r != nil { +log.WithField("grpc.method", info.FullMethod).Errorf("panic: %v\n%s", r, debug.Stack()) +} +}() +resp, err := handler(ctx, req) +if rec := recover() ; rec != nil { +return nil, status.Error(codes.Internal, fmt.Sprint(rec)) +} +return resp, err } pgrpc.UseUnaryInterceptors(RecoveryUnaryInterceptor) @@ -157,32 +157,32 @@ pgrpc.UseUnaryInterceptors(LoggingUnaryInterceptor, RecoveryUnaryInterceptor) ```go import ( - "time" - log "github.com/sirupsen/logrus" - "google.golang.org/grpc" +"time" +log "github.com/sirupsen/logrus" +"google.golang.org/grpc" ) func LoggingStreamInterceptor( - srv any, - ss grpc.ServerStream, - info *grpc.StreamServerInfo, - handler grpc.StreamHandler, +srv any, +ss grpc.ServerStream, +info *grpc.StreamServerInfo, +handler grpc.StreamHandler, ) error { - start := time.Now() - err := handler(srv, ss) - dur := time.Since(start) - entry := log.WithFields(log.Fields{ - "grpc.method": info.FullMethod, - "grpc.is_client_stream": info.IsClientStream, - "grpc.is_server_stream": info.IsServerStream, - "grpc.duration_ms": dur.Milliseconds(), - }) - if err != nil { - entry.WithError(err).Warn("grpc stream request failed") - } else { - entry.Info("grpc stream request finished") - } - return err +start := time.Now() +err := handler(srv, ss) +dur := time.Since(start) +entry := log.WithFields(log.Fields{ +"grpc.method": info.FullMethod, +"grpc.is_client_stream": info.IsClientStream, +"grpc.is_server_stream": info.IsServerStream, +"grpc.duration_ms": dur.Milliseconds(), +}) +if err != nil { +entry.WithError(err).Warn("grpc stream request failed") +} else { +entry.Info("grpc stream request finished") +} +return err } pgrpc.UseStreamInterceptors(LoggingStreamInterceptor) @@ -192,23 +192,23 @@ pgrpc.UseStreamInterceptors(LoggingStreamInterceptor) ```go import ( - "runtime/debug" - log "github.com/sirupsen/logrus" - "google.golang.org/grpc" +"runtime/debug" +log "github.com/sirupsen/logrus" +"google.golang.org/grpc" ) func RecoveryStreamInterceptor( - srv any, - ss grpc.ServerStream, - info *grpc.StreamServerInfo, - handler grpc.StreamHandler, +srv any, +ss grpc.ServerStream, +info *grpc.StreamServerInfo, +handler grpc.StreamHandler, ) (err error) { - defer func() { - if r := recover(); r != nil { - log.WithField("grpc.method", info.FullMethod).Errorf("panic: %v\n%s", r, debug.Stack()) - } - }() - return handler(srv, ss) +defer func() { +if r := recover() ; r != nil { +log.WithField("grpc.method", info.FullMethod).Errorf("panic: %v\n%s", r, debug.Stack()) +} +}() +return handler(srv, ss) } pgrpc.UseStreamInterceptors(RecoveryStreamInterceptor) @@ -219,8 +219,8 @@ pgrpc.UseStreamInterceptors(RecoveryStreamInterceptor) - 可以多次调用 `UseOptions/UseUnaryInterceptors/UseStreamInterceptors`,最终会在服务构造时链式生效。 - 单元测试中如需隔离,建议使用 `pgrpc.Reset()` 清理已注册的选项和拦截器。 - 若要启用健康检查或反射,请在配置中设置: - - `EnableHealth = true` - - `EnableReflection = true` +- `EnableHealth = true` +- `EnableReflection = true` ## 更多 ServerOption 示例 @@ -228,14 +228,14 @@ TLS(服务端或 mTLS): ```go import ( - "crypto/tls" - grpcCredentials "google.golang.org/grpc/credentials" +"crypto/tls" +grpcCredentials "google.golang.org/grpc/credentials" ) // 使用自定义 tls.Config(可配置 mTLS) var tlsConfig *tls.Config = &tls.Config{ /* ... */ } pgrpc.UseOptions( - grpc.Creds(grpcCredentials.NewTLS(tlsConfig)), +grpc.Creds(grpcCredentials.NewTLS(tlsConfig)), ) // 或者从证书文件加载(仅服务端 TLS) @@ -246,11 +246,11 @@ OpenTelemetry 统计/追踪(StatsHandler): ```go import ( - otelgrpc "go.opentelemetry.io/contrib/instrumentation/google.golang.org/grpc/otelgrpc" +otelgrpc "go.opentelemetry.io/contrib/instrumentation/google.golang.org/grpc/otelgrpc" ) pgrpc.UseOptions( - grpc.StatsHandler(otelgrpc.NewServerHandler()), +grpc.StatsHandler(otelgrpc.NewServerHandler()), ) ``` @@ -258,10 +258,10 @@ pgrpc.UseOptions( ```go pgrpc.UseOptions( - grpc.InitialWindowSize(1<<20), // 每个流初始窗口(字节) - grpc.InitialConnWindowSize(1<<21), // 连接级窗口 - grpc.ReadBufferSize(64<<10), // 读缓冲 64 KiB - grpc.WriteBufferSize(64<<10), // 写缓冲 64 KiB +grpc.InitialWindowSize(1<<20), // 每个流初始窗口(字节) +grpc.InitialConnWindowSize(1<<21), // 连接级窗口 +grpc.ReadBufferSize(64<<10), // 读缓冲 64 KiB +grpc.WriteBufferSize(64<<10), // 写缓冲 64 KiB ) ``` @@ -269,17 +269,17 @@ pgrpc.UseOptions( ```go import ( - "context" - "time" - "google.golang.org/grpc/tap" +"context" +"time" +"google.golang.org/grpc/tap" ) pgrpc.UseOptions( - grpc.ConnectionTimeout(5 * time.Second), - grpc.InTapHandle(func(ctx context.Context, info *tap.Info) (context.Context, error) { - // 在真正的 RPC 处理前进行快速拒绝(如黑名单、IP 检查等) - return ctx, nil - }), +grpc.ConnectionTimeout(5 * time.Second), +grpc.InTapHandle(func(ctx context.Context, info *tap.Info) (context.Context, error) { +// 在真正的 RPC 处理前进行快速拒绝(如黑名单、IP 检查等) +return ctx, nil +}), ) ``` @@ -287,11 +287,11 @@ pgrpc.UseOptions( ```go pgrpc.UseOptions( - grpc.UnknownServiceHandler(func(srv any, stream grpc.ServerStream) error { - // 统一记录未注册方法,或返回自定义错误 - return status.Error(codes.Unimplemented, "unknown service/method") - }), - grpc.NumStreamWorkers(8), // 针对 CPU 密集流处理的工作池 +grpc.UnknownServiceHandler(func(srv any, stream grpc.ServerStream) error { +// 统一记录未注册方法,或返回自定义错误 +return status.Error(codes.Unimplemented, "unknown service/method") +}), +grpc.NumStreamWorkers(8), // 针对 CPU 密集流处理的工作池 ) ``` @@ -301,24 +301,24 @@ pgrpc.UseOptions( ```go import ( - "context" - "strings" - "google.golang.org/grpc/metadata" - "google.golang.org/grpc/status" - "google.golang.org/grpc/codes" +"context" +"strings" +"google.golang.org/grpc/metadata" +"google.golang.org/grpc/status" +"google.golang.org/grpc/codes" ) func AuthUnaryInterceptor(ctx context.Context, req any, info *grpc.UnaryServerInfo, handler grpc.UnaryHandler) (any, error) { - md, _ := metadata.FromIncomingContext(ctx) - token := "" - if vals := md.Get("authorization"); len(vals) > 0 { - token = vals[0] - } - if token == "" || !strings.HasPrefix(strings.ToLower(token), "bearer ") { - return nil, status.Error(codes.Unauthenticated, "missing or invalid token") - } - // TODO: 验证 JWT / API-Key - return handler(ctx, req) +md, _ := metadata.FromIncomingContext(ctx) +token := "" +if vals := md.Get("authorization"); len(vals) > 0 { +token = vals[0] +} +if token == "" || !strings.HasPrefix(strings.ToLower(token), "bearer ") { +return nil, status.Error(codes.Unauthenticated, "missing or invalid token") +} +// TODO: 验证 JWT / API-Key +return handler(ctx, req) } pgrpc.UseUnaryInterceptors(AuthUnaryInterceptor) @@ -328,32 +328,32 @@ pgrpc.UseUnaryInterceptors(AuthUnaryInterceptor) ```go import ( - "context" - "sync" - "golang.org/x/time/rate" - "google.golang.org/grpc/status" - "google.golang.org/grpc/codes" +"context" +"sync" +"golang.org/x/time/rate" +"google.golang.org/grpc/status" +"google.golang.org/grpc/codes" ) var ( - rlmu sync.RWMutex - rlm = map[string]*rate.Limiter{} +rlmu sync.RWMutex +rlm = map[string]*rate.Limiter{} ) func limitFor(method string) *rate.Limiter { - rlmu.RLock(); l := rlm[method]; rlmu.RUnlock() - if l != nil { return l } - rlmu.Lock(); defer rlmu.Unlock() - if rlm[method] == nil { rlm[method] = rate.NewLimiter(100, 200) } // 100 rps, burst 200 - return rlm[method] +rlmu.RLock() ; l := rlm[method]; rlmu.RUnlock() +if l != nil { return l } +rlmu.Lock() ; defer rlmu.Unlock() +if rlm[method] == nil { rlm[method] = rate.NewLimiter(100, 200) } // 100 rps, burst 200 +return rlm[method] } func RateLimitUnaryInterceptor(ctx context.Context, req any, info *grpc.UnaryServerInfo, handler grpc.UnaryHandler) (any, error) { - l := limitFor(info.FullMethod) - if !l.Allow() { - return nil, status.Error(codes.ResourceExhausted, "rate limited") - } - return handler(ctx, req) +l := limitFor(info.FullMethod) +if !l.Allow() { +return nil, status.Error(codes.ResourceExhausted, "rate limited") +} +return handler(ctx, req) } pgrpc.UseUnaryInterceptors(RateLimitUnaryInterceptor) @@ -363,21 +363,21 @@ Request-ID 注入与日志关联: ```go import ( - "context" - "github.com/google/uuid" - "google.golang.org/grpc/metadata" +"context" +"github.com/google/uuid" +"google.golang.org/grpc/metadata" ) type ctxKey string const requestIDKey ctxKey = "request_id" func RequestIDUnaryInterceptor(ctx context.Context, req any, info *grpc.UnaryServerInfo, handler grpc.UnaryHandler) (any, error) { - md, _ := metadata.FromIncomingContext(ctx) - var rid string - if v := md.Get("x-request-id"); len(v) > 0 { rid = v[0] } - if rid == "" { rid = uuid.New().String() } - ctx = context.WithValue(ctx, requestIDKey, rid) - return handler(ctx, req) +md, _ := metadata.FromIncomingContext(ctx) +var rid string +if v := md.Get("x-request-id"); len(v) > 0 { rid = v[0] } +if rid == "" { rid = uuid.New().String() } +ctx = context.WithValue(ctx, requestIDKey, rid) +return handler(ctx, req) } pgrpc.UseUnaryInterceptors(RequestIDUnaryInterceptor) @@ -387,25 +387,25 @@ pgrpc.UseUnaryInterceptors(RequestIDUnaryInterceptor) ```go import ( - "context" - "time" - "google.golang.org/grpc/status" - "google.golang.org/grpc/codes" +"context" +"time" +"google.golang.org/grpc/status" +"google.golang.org/grpc/codes" ) func DeadlineUnaryInterceptor(max time.Duration) grpc.UnaryServerInterceptor { - return func(ctx context.Context, req any, info *grpc.UnaryServerInfo, handler grpc.UnaryHandler) (any, error) { - if _, ok := ctx.Deadline(); !ok { // 未设置超时 - var cancel context.CancelFunc - ctx, cancel = context.WithTimeout(ctx, max) - defer cancel() - } - resp, err := handler(ctx, req) - if err != nil && ctx.Err() == context.DeadlineExceeded { - return nil, status.Error(codes.DeadlineExceeded, "deadline exceeded") - } - return resp, err - } +return func(ctx context.Context, req any, info *grpc.UnaryServerInfo, handler grpc.UnaryHandler) (any, error) { +if _, ok := ctx.Deadline() ; !ok { // 未设置超时 +var cancel context.CancelFunc +ctx, cancel = context.WithTimeout(ctx, max) +defer cancel() +} +resp, err := handler(ctx, req) +if err != nil && ctx.Err() == context.DeadlineExceeded { +return nil, status.Error(codes.DeadlineExceeded, "deadline exceeded") +} +return resp, err +} } pgrpc.UseUnaryInterceptors(DeadlineUnaryInterceptor(5*time.Second)) @@ -417,17 +417,17 @@ pgrpc.UseUnaryInterceptors(DeadlineUnaryInterceptor(5*time.Second)) ```go import ( - "google.golang.org/grpc/metadata" - "google.golang.org/grpc/status" - "google.golang.org/grpc/codes" +"google.golang.org/grpc/metadata" +"google.golang.org/grpc/status" +"google.golang.org/grpc/codes" ) func AuthStreamInterceptor(srv any, ss grpc.ServerStream, info *grpc.StreamServerInfo, handler grpc.StreamHandler) error { - md, _ := metadata.FromIncomingContext(ss.Context()) - if len(md.Get("authorization")) == 0 { - return status.Error(codes.Unauthenticated, "missing token") - } - return handler(srv, ss) +md, _ := metadata.FromIncomingContext(ss.Context()) +if len(md.Get("authorization")) == 0 { +return status.Error(codes.Unauthenticated, "missing token") +} +return handler(srv, ss) } pgrpc.UseStreamInterceptors(AuthStreamInterceptor) @@ -437,11 +437,11 @@ pgrpc.UseStreamInterceptors(AuthStreamInterceptor) ```go func RateLimitStreamInterceptor(srv any, ss grpc.ServerStream, info *grpc.StreamServerInfo, handler grpc.StreamHandler) error { - l := limitFor(info.FullMethod) - if !l.Allow() { - return status.Error(codes.ResourceExhausted, "rate limited") - } - return handler(srv, ss) +l := limitFor(info.FullMethod) +if !l.Allow() { +return status.Error(codes.ResourceExhausted, "rate limited") +} +return handler(srv, ss) } pgrpc.UseStreamInterceptors(RateLimitStreamInterceptor) @@ -453,8 +453,61 @@ pgrpc.UseStreamInterceptors(RateLimitStreamInterceptor) ```go import ( - _ "google.golang.org/grpc/encoding/gzip" // 注册 gzip 编解码器 +_ "google.golang.org/grpc/encoding/gzip" // 注册 gzip 编解码器 ) // 仅需 import 即可,无额外 ServerOption ``` + +## OpenTelemetry 集成(推荐) + +使用 StatsHandler(推荐,不与拦截器同时使用,避免重复埋点): + +```go +import ( +otelgrpc "go.opentelemetry.io/contrib/instrumentation/google.golang.org/grpc/otelgrpc" +) + +// 基本接入:使用全局 Tracer/Meter(由 OTEL Provider 初始化) +handler := otelgrpc.NewServerHandler( +otelgrpc.WithTraceEvents(), // 在 span 中记录消息事件 +) +pgrpc.UseOptions(grpc.StatsHandler(handler)) + +// 忽略某些方法(如健康检查),避免噪声: +handler = otelgrpc.NewServerHandler( +otelgrpc.WithFilter(func(ctx context.Context, fullMethod string) bool { +return fullMethod != "/grpc.health.v1.Health/Check" +}), +) +pgrpc.UseOptions(grpc.StatsHandler(handler)) +``` + +使用拦截器版本(如你更偏好 Interceptor 方案;与 StatsHandler 二选一): + +```go +import ( +otelgrpc "go.opentelemetry.io/contrib/instrumentation/google.golang.org/grpc/otelgrpc" +) + +pgrpc.UseUnaryInterceptors(otelgrpc.UnaryServerInterceptor()) +pgrpc.UseStreamInterceptors(otelgrpc.StreamServerInterceptor()) +``` + +> 注意:不要同时启用 StatsHandler 和拦截器,否则会重复生成 span/metrics。 + +## OpenTracing(Jaeger)集成 + +当使用 Tracing Provider(Jaeger + OpenTracing)时,可使用 opentracing 的 gRPC 拦截器: + +```go +import ( +opentracing "github.com/opentracing/opentracing-go" +otgrpc "github.com/grpc-ecosystem/grpc-opentracing/go/otgrpc" +) + +pgrpc.UseUnaryInterceptors(otgrpc.OpenTracingServerInterceptor(opentracing.GlobalTracer())) +pgrpc.UseStreamInterceptors(otgrpc.OpenTracingStreamServerInterceptor(opentracing.GlobalTracer())) +``` + +> 与 OTEL 方案互斥:如果已启用 OTEL,请不要再开启 OpenTracing 拦截器,以免重复埋点。 diff --git a/templates/project/providers/otel/README.md.raw b/templates/project/providers/otel/README.md.raw new file mode 100644 index 0000000..907e42c --- /dev/null +++ b/templates/project/providers/otel/README.md.raw @@ -0,0 +1,121 @@ +OpenTelemetry Provider (OTLP Traces + Metrics) + +该 Provider 基于 OpenTelemetry Go SDK,初始化全局 Tracer 与 Meter,支持 OTLP(gRPC/HTTP) 导出,并收集运行时指标。 + +配置(config.toml) + +``` +[OTEL] +ServiceName = "my-service" +Version = "1.0.0" +Env = "dev" + +# 导出端点(二选一) +EndpointGRPC = "otel-collector:4317" +EndpointHTTP = "otel-collector:4318" + +# 认证(可选) +Token = "Bearer " # 也可只填纯 token,Provider 会自动补齐 Bearer 前缀 + +# 安全(可选) +InsecureGRPC = true # gRPC 导出是否使用 insecure +InsecureHTTP = true # HTTP 导出是否使用 insecure + +# 采样(可选) +Sampler = "always" # always|ratio +SamplerRatio = 0.1 # Sampler=ratio 时生效,0..1 + +# 批处理(可选,毫秒) +BatchTimeoutMs = 5000 +ExportTimeoutMs = 10000 +MaxQueueSize = 2048 +MaxExportBatchSize = 512 + +# 指标(可选,毫秒) +MetricReaderIntervalMs = 10000 # 指标导出周期 +RuntimeReadMemStatsIntervalMs = 5000 # 运行时指标读取周期 +``` + +启用 + +``` +import "test/providers/otel" + +func providers() container.Providers { + return container.Providers{ + otel.DefaultProvider(), + } +} +``` + +使用 + +- Traces: 通过 `go.opentelemetry.io/otel` 获取全局 Tracer,或使用仓库提供的 `providers/otel/funcs.go` 包装。 + +``` +ctx, span := otel.Tracer("my-service").Start(ctx, "my-op") +// ... +span.End() +``` + +- Metrics: 通过 `otel.Meter("my-service")` 创建仪表,或使用 `providers/otel/funcs.go` 的便捷函数。 + +与 Tracing Provider 的区别与场景建议 + +- Tracing Provider(Jaeger + OpenTracing)只做链路,适合已有 OpenTracing 项目; +- OTEL Provider(OpenTelemetry)统一 Traces+Metrics,对接 OTLP 生态,适合新项目或希望统一可观测性; +- 可先混用:保留 Jaeger 链路,同时启用 OTEL 运行时指标,逐步迁移。 + +快速启动(本地 Collector) + +最小化 docker-compose: + +``` +services: + otel-collector: + image: otel/opentelemetry-collector:0.104.0 + command: ["--config=/etc/otelcol-config.yml"] + volumes: + - ./otelcol-config.yml:/etc/otelcol-config.yml:ro + ports: + - "4317:4317" # OTLP gRPC + - "4318:4318" # OTLP HTTP +``` + +示例 otelcol-config.yml: + +``` +receivers: + otlp: + protocols: + grpc: + http: +exporters: + debug: + verbosity: detailed +processors: + batch: +service: + pipelines: + traces: + receivers: [otlp] + processors: [batch] + exporters: [debug] + metrics: + receivers: [otlp] + processors: [batch] + exporters: [debug] +``` + +应用端: + +``` +[OTEL] +EndpointGRPC = "127.0.0.1:4317" +InsecureGRPC = true +``` + +故障与降级 + +- Collector/网络异常:OTEL SDK 异步批处理,不阻塞业务;可能丢点/丢指标; +- 启动失败:初始化报错会阻止启动;如需“不可达也不影响启动”,可加开关降级为 no-op(可按需补充)。 diff --git a/templates/project/providers/otel/config.go.tpl b/templates/project/providers/otel/config.go.tpl index f09a9a8..df92cb8 100644 --- a/templates/project/providers/otel/config.go.tpl +++ b/templates/project/providers/otel/config.go.tpl @@ -21,13 +21,32 @@ func DefaultProvider() container.ProviderContainer { } type Config struct { - ServiceName string - Version string - Env string + ServiceName string + Version string + Env string - EndpointGRPC string - EndpointHTTP string - Token string + EndpointGRPC string + EndpointHTTP string + Token string + + // Connection security + InsecureGRPC bool // if true, use grpc insecure for OTLP gRPC + InsecureHTTP bool // if true, use http insecure for OTLP HTTP + + // Tracing sampler + // Sampler: "always" (default) or "ratio" + Sampler string + SamplerRatio float64 // used when Sampler == "ratio"; 0..1 + + // Tracing batcher options (milliseconds) + BatchTimeoutMs uint + ExportTimeoutMs uint + MaxQueueSize int + MaxExportBatchSize int + + // Metrics options (milliseconds) + MetricReaderIntervalMs uint // export interval for PeriodicReader + RuntimeReadMemStatsIntervalMs uint // runtime metrics min read interval } func (c *Config) format() { diff --git a/templates/project/providers/otel/provider.go.tpl b/templates/project/providers/otel/provider.go.tpl index 967aa0a..db300d6 100644 --- a/templates/project/providers/otel/provider.go.tpl +++ b/templates/project/providers/otel/provider.go.tpl @@ -26,6 +26,17 @@ import ( "google.golang.org/grpc/encoding/gzip" ) +// formatAuth formats token into an Authorization header value. +func formatAuth(token string) string { + if token == "" { + return "" + } + if len(token) > 7 && (token[:7] == "Bearer " || token[:7] == "bearer ") { + return token + } + return "Bearer " + token +} + func Provide(opts ...opt.Option) error { o := opt.New(opts...) var config Config @@ -82,7 +93,7 @@ func (o *builder) initResource(ctx context.Context) (err error) { semconv.HostNameKey.String(hostName), // 主机名 ), ) - return + return err } func (o *builder) initMeterProvider(ctx context.Context) (err error) { @@ -92,34 +103,35 @@ func (o *builder) initMeterProvider(ctx context.Context) (err error) { otlpmetricgrpc.WithCompressor(gzip.Name), } - if o.config.Token != "" { - headers := map[string]string{"Authentication": o.config.Token} - opts = append(opts, otlpmetricgrpc.WithHeaders(headers)) + if h := formatAuth(o.config.Token); h != "" { + opts = append(opts, otlpmetricgrpc.WithHeaders(map[string]string{"Authorization": h})) } - exporter, err := otlpmetricgrpc.New(ctx, opts...) + exporter, err := otlpmetricgrpc.New(ctx, opts...) if err != nil { return nil, err } return exporter, nil } - exporterHttpFunc := func(ctx context.Context) (sdkmetric.Exporter, error) { - opts := []otlpmetrichttp.Option{ - otlpmetrichttp.WithEndpoint(o.config.EndpointHTTP), - otlpmetrichttp.WithCompression(1), - } + exporterHttpFunc := func(ctx context.Context) (sdkmetric.Exporter, error) { + opts := []otlpmetrichttp.Option{ + otlpmetrichttp.WithEndpoint(o.config.EndpointHTTP), + otlpmetrichttp.WithCompression(1), + } + if o.config.InsecureHTTP { + opts = append(opts, otlpmetrichttp.WithInsecure()) + } + if h := formatAuth(o.config.Token); h != "" { + opts = append(opts, otlpmetrichttp.WithHeaders(map[string]string{"Authorization": h})) + } - if o.config.Token != "" { - opts = append(opts, otlpmetrichttp.WithURLPath(o.config.Token)) - } - - exporter, err := otlpmetrichttp.New(ctx, opts...) - if err != nil { - return nil, err - } - return exporter, nil - } + exporter, err := otlpmetrichttp.New(ctx, opts...) + if err != nil { + return nil, err + } + return exporter, nil + } var exporter sdkmetric.Exporter if o.config.EndpointHTTP != "" { @@ -129,18 +141,27 @@ func (o *builder) initMeterProvider(ctx context.Context) (err error) { } if err != nil { - return + return err } - meterProvider := sdkmetric.NewMeterProvider( - sdkmetric.WithReader( - sdkmetric.NewPeriodicReader(exporter), - ), - sdkmetric.WithResource(o.resource), - ) + // periodic reader with optional custom interval + var readerOpts []sdkmetric.PeriodicReaderOption + if o.config.MetricReaderIntervalMs > 0 { + readerOpts = append(readerOpts, sdkmetric.WithInterval(time.Duration(o.config.MetricReaderIntervalMs)*time.Millisecond)) + } + meterProvider := sdkmetric.NewMeterProvider( + sdkmetric.WithReader( + sdkmetric.NewPeriodicReader(exporter, readerOpts...), + ), + sdkmetric.WithResource(o.resource), + ) otel.SetMeterProvider(meterProvider) - err = runtime.Start(runtime.WithMinimumReadMemStatsInterval(time.Second * 5)) + interval := 5 * time.Second + if o.config.RuntimeReadMemStatsIntervalMs > 0 { + interval = time.Duration(o.config.RuntimeReadMemStatsIntervalMs) * time.Millisecond + } + err = runtime.Start(runtime.WithMinimumReadMemStatsInterval(interval)) if err != nil { return errors.Wrapf(err, "Failed to start runtime metrics") } @@ -151,23 +172,21 @@ func (o *builder) initMeterProvider(ctx context.Context) (err error) { } }) - return + return err } func (o *builder) initTracerProvider(ctx context.Context) error { exporterGrpcFunc := func(ctx context.Context) (*otlptrace.Exporter, error) { - opts := []otlptracegrpc.Option{ - otlptracegrpc.WithCompressor(gzip.Name), - otlptracegrpc.WithEndpoint(o.config.EndpointGRPC), - otlptracegrpc.WithInsecure(), // 添加不安全连接选项 - } + opts := []otlptracegrpc.Option{ + otlptracegrpc.WithCompressor(gzip.Name), + otlptracegrpc.WithEndpoint(o.config.EndpointGRPC), + } + if o.config.InsecureGRPC { + opts = append(opts, otlptracegrpc.WithInsecure()) + } - if o.config.Token != "" { - headers := map[string]string{ - "Authentication": o.config.Token, - "authorization": o.config.Token, // 添加标准认证头 - } - opts = append(opts, otlptracegrpc.WithHeaders(headers)) + if h := formatAuth(o.config.Token); h != "" { + opts = append(opts, otlptracegrpc.WithHeaders(map[string]string{"Authorization": h})) } log.Debugf("Creating GRPC trace exporter with endpoint: %s", o.config.EndpointGRPC) @@ -188,18 +207,16 @@ func (o *builder) initTracerProvider(ctx context.Context) error { } exporterHttpFunc := func(ctx context.Context) (*otlptrace.Exporter, error) { - opts := []otlptracehttp.Option{ - otlptracehttp.WithInsecure(), - otlptracehttp.WithCompression(1), - otlptracehttp.WithEndpoint(o.config.EndpointHTTP), - } + opts := []otlptracehttp.Option{ + otlptracehttp.WithCompression(1), + otlptracehttp.WithEndpoint(o.config.EndpointHTTP), + } + if o.config.InsecureHTTP { + opts = append(opts, otlptracehttp.WithInsecure()) + } - if o.config.Token != "" { - opts = append(opts, - otlptracehttp.WithHeaders(map[string]string{ - "Authentication": o.config.Token, - }), - ) + if h := formatAuth(o.config.Token); h != "" { + opts = append(opts, otlptracehttp.WithHeaders(map[string]string{"Authorization": h})) } log.Debugf("Creating HTTP trace exporter with endpoint: %s", o.config.EndpointHTTP) @@ -225,11 +242,39 @@ func (o *builder) initTracerProvider(ctx context.Context) error { return err } - traceProvider := sdktrace.NewTracerProvider( - sdktrace.WithSampler(sdktrace.AlwaysSample()), - sdktrace.WithResource(o.resource), - sdktrace.WithBatcher(exporter), - ) + // Sampler + sampler := sdktrace.AlwaysSample() + if o.config.Sampler == "ratio" { + ratio := o.config.SamplerRatio + if ratio <= 0 { + ratio = 0 + } + if ratio > 1 { + ratio = 1 + } + sampler = sdktrace.ParentBased(sdktrace.TraceIDRatioBased(ratio)) + } + + // Batcher options + var batchOpts []sdktrace.BatchSpanProcessorOption + if o.config.BatchTimeoutMs > 0 { + batchOpts = append(batchOpts, sdktrace.WithBatchTimeout(time.Duration(o.config.BatchTimeoutMs)*time.Millisecond)) + } + if o.config.ExportTimeoutMs > 0 { + batchOpts = append(batchOpts, sdktrace.WithExportTimeout(time.Duration(o.config.ExportTimeoutMs)*time.Millisecond)) + } + if o.config.MaxQueueSize > 0 { + batchOpts = append(batchOpts, sdktrace.WithMaxQueueSize(o.config.MaxQueueSize)) + } + if o.config.MaxExportBatchSize > 0 { + batchOpts = append(batchOpts, sdktrace.WithMaxExportBatchSize(o.config.MaxExportBatchSize)) + } + + traceProvider := sdktrace.NewTracerProvider( + sdktrace.WithSampler(sampler), + sdktrace.WithResource(o.resource), + sdktrace.WithBatcher(exporter, batchOpts...), + ) container.AddCloseAble(func() { log.Error("shut down") if err := traceProvider.Shutdown(ctx); err != nil {