fix: issues

This commit is contained in:
Rogee
2025-01-07 17:16:15 +08:00
parent 49f68ca533
commit c5d0b643cf

View File

@@ -10,10 +10,13 @@ import (
"github.com/pkg/errors" "github.com/pkg/errors"
"go.opentelemetry.io/otel" "go.opentelemetry.io/otel"
"go.opentelemetry.io/otel/exporters/otlp/otlpmetric/otlpmetricgrpc"
"go.opentelemetry.io/otel/exporters/otlp/otlpmetric/otlpmetrichttp"
"go.opentelemetry.io/otel/exporters/otlp/otlptrace" "go.opentelemetry.io/otel/exporters/otlp/otlptrace"
"go.opentelemetry.io/otel/exporters/otlp/otlptrace/otlptracegrpc" "go.opentelemetry.io/otel/exporters/otlp/otlptrace/otlptracegrpc"
"go.opentelemetry.io/otel/exporters/otlp/otlptrace/otlptracehttp" "go.opentelemetry.io/otel/exporters/otlp/otlptrace/otlptracehttp"
"go.opentelemetry.io/otel/propagation" "go.opentelemetry.io/otel/propagation"
sdkmetric "go.opentelemetry.io/otel/sdk/metric"
"go.opentelemetry.io/otel/sdk/resource" "go.opentelemetry.io/otel/sdk/resource"
sdktrace "go.opentelemetry.io/otel/sdk/trace" sdktrace "go.opentelemetry.io/otel/sdk/trace"
semconv "go.opentelemetry.io/otel/semconv/v1.15.0" semconv "go.opentelemetry.io/otel/semconv/v1.15.0"
@@ -30,125 +33,192 @@ func Provide(opts ...opt.Option) error {
return container.Container.Provide(func(ctx context.Context) (*OTEL, error) { return container.Container.Provide(func(ctx context.Context) (*OTEL, error) {
o := &OTEL{ o := &OTEL{
Tracer: otel.Tracer(config.ServiceName), Tracer: otel.Tracer(config.ServiceName),
config: &config,
} }
var err error err := o.initResource(ctx)
o.Resource, err = initResource(ctx, &config)
if err != nil { if err != nil {
return o, errors.Wrapf(err, "Failed to create OpenTelemetry resource") return o, errors.Wrapf(err, "Failed to create OpenTelemetry resource")
} }
if config.EndpointHTTP != "" { if err := o.initMeterProvider(ctx); err != nil {
o.Exporter, o.SpanProcessor, err = initHTTPExporterAndSpanProcessor(ctx, &config) return o, errors.Wrapf(err, "Failed to create OpenTelemetry metric provider")
if err != nil {
return o, errors.Wrapf(err, "Failed to create OpenTelemetry trace exporter")
}
} else if config.EndpointGRPC != "" {
o.Exporter, o.SpanProcessor, err = initGrpcExporterAndSpanProcessor(ctx, &config)
if err != nil {
return o, errors.Wrapf(err, "Failed to create OpenTelemetry trace exporter")
}
} else {
return o, errors.New("http or grpc endpoint is required")
} }
traceProvider := sdktrace.NewTracerProvider( if err := o.initTracerProvider(ctx); err != nil {
sdktrace.WithSampler(sdktrace.AlwaysSample()), return o, errors.Wrapf(err, "Failed to create OpenTelemetry tracer provider")
sdktrace.WithResource(o.Resource), }
sdktrace.WithSpanProcessor(o.SpanProcessor),
)
otel.SetTracerProvider(traceProvider)
otel.SetTextMapPropagator(propagation.NewCompositeTextMapPropagator(
propagation.TraceContext{},
propagation.Baggage{},
))
container.AddCloseAble(func() {
cxt, cancel := context.WithTimeout(ctx, time.Second)
defer cancel()
if err := o.Exporter.Shutdown(cxt); err != nil {
otel.Handle(err)
}
})
return o, nil return o, nil
}, o.DiOptions()...) }, o.DiOptions()...)
} }
type OTEL struct { type OTEL struct {
Tracer trace.Tracer config *Config
Resource *resource.Resource
Exporter *otlptrace.Exporter Tracer trace.Tracer
SpanProcessor sdktrace.SpanProcessor Resource *resource.Resource
} }
func initResource(ctx context.Context, conf *Config) (*resource.Resource, error) { func (o *OTEL) initResource(ctx context.Context) (err error) {
hostName, _ := os.Hostname() hostName, _ := os.Hostname()
r, err := resource.New( o.Resource, err = resource.New(
ctx, ctx,
resource.WithFromEnv(), resource.WithFromEnv(),
resource.WithProcess(), resource.WithProcess(),
resource.WithTelemetrySDK(), resource.WithTelemetrySDK(),
resource.WithHost(), resource.WithHost(),
resource.WithAttributes( resource.WithAttributes(
semconv.ServiceNameKey.String(conf.ServiceName), // semconv.ServiceNameKey.String(o.config.ServiceName), //
semconv.ServiceVersionKey.String(conf.Version), // semconv.ServiceVersionKey.String(o.config.Version), //
semconv.DeploymentEnvironmentKey.String(conf.Env), // semconv.DeploymentEnvironmentKey.String(o.config.Env), //
semconv.HostNameKey.String(hostName), // semconv.HostNameKey.String(hostName), //
), ),
) )
if err != nil { return
return nil, err
}
return r, nil
} }
func initHTTPExporterAndSpanProcessor(ctx context.Context, conf *Config) (*otlptrace.Exporter, sdktrace.SpanProcessor, error) { func (o *OTEL) initMeterProvider(ctx context.Context) (err error) {
opts := []otlptracehttp.Option{ exporterGrpcFunc := func(ctx context.Context) (sdkmetric.Exporter, error) {
otlptracehttp.WithInsecure(), opts := []otlpmetricgrpc.Option{
otlptracehttp.WithCompression(1), otlpmetricgrpc.WithEndpoint(o.config.EndpointGRPC),
otlpmetricgrpc.WithCompressor(gzip.Name),
}
if o.config.Token != "" {
headers := map[string]string{"Authentication": o.config.Token}
opts = append(opts, otlpmetricgrpc.WithHeaders(headers))
}
exporter, err := otlpmetricgrpc.New(ctx, opts...)
if err != nil {
return nil, err
}
return exporter, nil
} }
if conf.Token != "" { exporterHttpFunc := func(ctx context.Context) (sdkmetric.Exporter, error) {
opts = append(opts, otlptracehttp.WithURLPath(conf.Token)) opts := []otlpmetrichttp.Option{
otlpmetrichttp.WithEndpoint(o.config.EndpointHTTP),
otlpmetrichttp.WithCompression(1),
}
if o.config.Token != "" {
opts = append(opts, otlpmetrichttp.WithURLPath(o.config.Token))
}
exporter, err := otlpmetrichttp.New(ctx, opts...)
if err != nil {
return nil, err
}
return exporter, nil
} }
if conf.EndpointHTTP != "" { var exporter sdkmetric.Exporter
opts = append(opts, otlptracehttp.WithEndpoint(conf.EndpointHTTP)) if o.config.EndpointHTTP != "" {
exporter, err = exporterHttpFunc(ctx)
} else {
exporter, err = exporterGrpcFunc(ctx)
} }
traceExporter, err := otlptrace.New(ctx, otlptracehttp.NewClient(opts...))
if err != nil { if err != nil {
return nil, nil, err return
} }
batchSpanProcessor := sdktrace.NewBatchSpanProcessor(traceExporter) meterProvider := sdkmetric.NewMeterProvider(
sdkmetric.WithReader(
sdkmetric.NewPeriodicReader(exporter),
),
sdkmetric.WithResource(o.Resource),
)
otel.SetMeterProvider(meterProvider)
return traceExporter, batchSpanProcessor, nil container.AddCloseAble(func() {
if err := meterProvider.Shutdown(ctx); err != nil {
otel.Handle(err)
}
})
return
} }
func initGrpcExporterAndSpanProcessor(ctx context.Context, conf *Config) (*otlptrace.Exporter, sdktrace.SpanProcessor, error) { func (o *OTEL) initTracerProvider(ctx context.Context) error {
opts := []otlptracegrpc.Option{ exporterGrpcFunc := func(ctx context.Context) (*otlptrace.Exporter, error) {
otlptracegrpc.WithCompressor(gzip.Name), opts := []otlptracegrpc.Option{
otlptracegrpc.WithCompressor(gzip.Name),
otlptracegrpc.WithEndpoint(o.config.EndpointGRPC),
}
if o.config.Token != "" {
headers := map[string]string{"Authentication": o.config.Token}
opts = append(opts, otlptracegrpc.WithHeaders(headers))
}
exporter, err := otlptrace.New(ctx, otlptracegrpc.NewClient(opts...))
if err != nil {
return nil, err
}
container.AddCloseAble(func() {
cxt, cancel := context.WithTimeout(ctx, time.Second)
defer cancel()
if err := exporter.Shutdown(cxt); err != nil {
otel.Handle(err)
}
})
return exporter, nil
} }
if conf.Token != "" { exporterHttpFunc := func(ctx context.Context) (*otlptrace.Exporter, error) {
headers := map[string]string{"Authentication": conf.Token} opts := []otlptracehttp.Option{
opts = append(opts, otlptracegrpc.WithHeaders(headers)) otlptracehttp.WithInsecure(),
otlptracehttp.WithCompression(1),
otlptracehttp.WithEndpoint(o.config.EndpointHTTP),
}
if o.config.Token != "" {
opts = append(opts, otlptracehttp.WithURLPath(o.config.Token))
}
exporter, err := otlptrace.New(ctx, otlptracehttp.NewClient(opts...))
if err != nil {
return nil, err
}
return exporter, nil
} }
if conf.EndpointGRPC != "" { var exporter *otlptrace.Exporter
opts = append(opts, otlptracegrpc.WithEndpoint(conf.EndpointGRPC)) var err error
if o.config.EndpointHTTP != "" {
exporter, err = exporterHttpFunc(ctx)
} else {
exporter, err = exporterGrpcFunc(ctx)
} }
traceExporter, err := otlptrace.New(ctx, otlptracegrpc.NewClient(opts...))
if err != nil { if err != nil {
return nil, nil, err return err
} }
batchSpanProcessor := sdktrace.NewBatchSpanProcessor(traceExporter) traceProvider := sdktrace.NewTracerProvider(
sdktrace.WithSampler(sdktrace.AlwaysSample()),
sdktrace.WithBatcher(exporter),
sdktrace.WithResource(o.Resource),
sdktrace.WithSpanProcessor(sdktrace.NewBatchSpanProcessor(exporter)),
)
container.AddCloseAble(func() {
if err := traceProvider.Shutdown(ctx); err != nil {
otel.Handle(err)
}
})
return traceExporter, batchSpanProcessor, nil otel.SetTracerProvider(traceProvider)
otel.SetTextMapPropagator(propagation.NewCompositeTextMapPropagator(
propagation.TraceContext{},
propagation.Baggage{},
))
return err
} }