Compare commits
2 Commits
42214ee821
...
9662d7d718
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
9662d7d718 | ||
|
|
1f17942665 |
@@ -17,7 +17,19 @@ func DefaultProvider() container.ProviderContainer {
|
||||
}
|
||||
}
|
||||
|
||||
type Config struct{}
|
||||
type Config struct {
|
||||
// Optional per-queue worker concurrency. If empty, defaults apply.
|
||||
QueueWorkers QueueWorkersConfig
|
||||
}
|
||||
|
||||
// QueueWorkers allows configuring worker concurrency per queue.
|
||||
// Key is the queue name, value is MaxWorkers. If empty, defaults are used.
|
||||
// Example TOML:
|
||||
//
|
||||
// [Job]
|
||||
// # high=20, default=10, low=5
|
||||
// # QueueWorkers = { high = 20, default = 10, low = 5 }
|
||||
type QueueWorkersConfig map[string]int
|
||||
|
||||
const (
|
||||
PriorityDefault = river.PriorityDefault
|
||||
@@ -31,3 +43,25 @@ const (
|
||||
QueueDefault = river.QueueDefault
|
||||
QueueLow = "low"
|
||||
)
|
||||
|
||||
// queueConfig returns a river.QueueConfig map built from QueueWorkers or defaults.
|
||||
func (c *Config) queueConfig() map[string]river.QueueConfig {
|
||||
cfg := map[string]river.QueueConfig{}
|
||||
if c == nil || len(c.QueueWorkers) == 0 {
|
||||
cfg[QueueHigh] = river.QueueConfig{MaxWorkers: 10}
|
||||
cfg[QueueDefault] = river.QueueConfig{MaxWorkers: 10}
|
||||
cfg[QueueLow] = river.QueueConfig{MaxWorkers: 10}
|
||||
return cfg
|
||||
}
|
||||
for name, n := range c.QueueWorkers {
|
||||
if n <= 0 {
|
||||
n = 1
|
||||
}
|
||||
cfg[name] = river.QueueConfig{MaxWorkers: n}
|
||||
}
|
||||
|
||||
if _, ok := cfg[QueueDefault]; !ok {
|
||||
cfg[QueueDefault] = river.QueueConfig{MaxWorkers: 10}
|
||||
}
|
||||
return cfg
|
||||
}
|
||||
|
||||
@@ -2,9 +2,11 @@ package job
|
||||
|
||||
import (
|
||||
"context"
|
||||
"fmt"
|
||||
"sync"
|
||||
"time"
|
||||
|
||||
"{{.ModuleName}}/providers/postgres"
|
||||
"test/providers/postgres"
|
||||
|
||||
"github.com/jackc/pgx/v5"
|
||||
"github.com/jackc/pgx/v5/pgxpool"
|
||||
@@ -37,10 +39,22 @@ func Provide(opts ...opt.Option) error {
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
// health check ping with timeout
|
||||
pingCtx, cancel := context.WithTimeout(ctx, 3*time.Second)
|
||||
defer cancel()
|
||||
if err := dbPool.Ping(pingCtx); err != nil {
|
||||
return nil, fmt.Errorf("job provider: db ping failed: %w", err)
|
||||
}
|
||||
container.AddCloseAble(dbPool.Close)
|
||||
pool := riverpgxv5.New(dbPool)
|
||||
|
||||
queue := &Job{Workers: workers, driver: pool, ctx: ctx, periodicJobs: make(map[string]rivertype.PeriodicJobHandle), jobs: make(map[string]*rivertype.JobInsertResult)}
|
||||
queue := &Job{
|
||||
Workers: workers,
|
||||
driver: pool,
|
||||
ctx: ctx,
|
||||
conf: &config,
|
||||
periodicJobs: make(map[string]rivertype.PeriodicJobHandle),
|
||||
}
|
||||
container.AddCloseAble(queue.Close)
|
||||
|
||||
return queue, nil
|
||||
@@ -49,6 +63,7 @@ func Provide(opts ...opt.Option) error {
|
||||
|
||||
type Job struct {
|
||||
ctx context.Context
|
||||
conf *Config
|
||||
Workers *river.Workers
|
||||
driver *riverpgxv5.Driver
|
||||
|
||||
@@ -56,7 +71,6 @@ type Job struct {
|
||||
client *river.Client[pgx.Tx]
|
||||
|
||||
periodicJobs map[string]rivertype.PeriodicJobHandle
|
||||
jobs map[string]*rivertype.JobInsertResult
|
||||
}
|
||||
|
||||
func (q *Job) Close() {
|
||||
@@ -67,6 +81,10 @@ func (q *Job) Close() {
|
||||
if err := q.client.StopAndCancel(q.ctx); err != nil {
|
||||
log.Errorf("Failed to stop and cancel client: %s", err)
|
||||
}
|
||||
// clear references
|
||||
q.l.Lock()
|
||||
q.periodicJobs = map[string]rivertype.PeriodicJobHandle{}
|
||||
q.l.Unlock()
|
||||
}
|
||||
|
||||
func (q *Job) Client() (*river.Client[pgx.Tx], error) {
|
||||
@@ -77,11 +95,7 @@ func (q *Job) Client() (*river.Client[pgx.Tx], error) {
|
||||
var err error
|
||||
q.client, err = river.NewClient(q.driver, &river.Config{
|
||||
Workers: q.Workers,
|
||||
Queues: map[string]river.QueueConfig{
|
||||
QueueHigh: {MaxWorkers: 10},
|
||||
QueueDefault: {MaxWorkers: 10},
|
||||
QueueLow: {MaxWorkers: 10},
|
||||
},
|
||||
Queues: q.conf.queueConfig(),
|
||||
})
|
||||
if err != nil {
|
||||
return nil, err
|
||||
@@ -158,15 +172,21 @@ func (q *Job) Cancel(id string) error {
|
||||
if h, ok := q.periodicJobs[id]; ok {
|
||||
client.PeriodicJobs().Remove(h)
|
||||
delete(q.periodicJobs, id)
|
||||
return nil
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
if r, ok := q.jobs[id]; ok {
|
||||
_, err = client.JobCancel(q.ctx, r.Job.ID)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
delete(q.jobs, id)
|
||||
// CancelContext is like Cancel but allows passing a context.
|
||||
func (q *Job) CancelContext(ctx context.Context, id string) error {
|
||||
client, err := q.Client()
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
q.l.Lock()
|
||||
defer q.l.Unlock()
|
||||
if h, ok := q.periodicJobs[id]; ok {
|
||||
client.PeriodicJobs().Remove(h)
|
||||
delete(q.periodicJobs, id)
|
||||
return nil
|
||||
}
|
||||
|
||||
@@ -182,6 +202,6 @@ func (q *Job) Add(job contracts.JobArgs) error {
|
||||
q.l.Lock()
|
||||
defer q.l.Unlock()
|
||||
|
||||
q.jobs[job.UniqueID()], err = client.Insert(q.ctx, job, lo.ToPtr(job.InsertOpts()))
|
||||
_, err = client.Insert(q.ctx, job, lo.ToPtr(job.InsertOpts()))
|
||||
return err
|
||||
}
|
||||
|
||||
299
templates/project/providers/tracing/README.md.raw
Normal file
299
templates/project/providers/tracing/README.md.raw
Normal file
@@ -0,0 +1,299 @@
|
||||
# Tracing Provider (Jaeger / OpenTracing)
|
||||
|
||||
该 Provider 使用 jaeger-client-go + OpenTracing 构建分布式追踪能力,可通过本地 Agent 或 Collector 上报。
|
||||
|
||||
## 配置项(config.toml)
|
||||
|
||||
```toml
|
||||
[Tracing]
|
||||
# 基础
|
||||
Name = "my-service"
|
||||
Disabled = false # 置为 true 可禁用追踪(不初始化 tracer)
|
||||
Gen128Bit = true # 生成 128-bit trace id
|
||||
ZipkinSharedRPCSpan = true # 与 Zipkin 共享 RPC span 模式(与旧行为一致)
|
||||
RPCMetrics = false # 启用 RPC metrics(Jaeger 自带)
|
||||
|
||||
# 采样器(const|probabilistic|ratelimiting|remote)
|
||||
Sampler_Type = "const"
|
||||
Sampler_Param = 1.0 # const:1=全采;probabilistic:概率;ratelimiting:每秒速率
|
||||
Sampler_SamplingServerURL = "" # remote 采样器服务地址
|
||||
Sampler_MaxOperations = 0
|
||||
Sampler_RefreshIntervalSec = 0
|
||||
|
||||
# Reporter(二选一或同时配置,Jaeger 会择优使用)
|
||||
Reporter_LocalAgentHostPort = "127.0.0.1:6831" # UDP Agent
|
||||
Reporter_CollectorEndpoint = "http://127.0.0.1:14268/api/traces" # HTTP Collector
|
||||
Reporter_LogSpans = true
|
||||
Reporter_BufferFlushMs = 100
|
||||
Reporter_QueueSize = 1000
|
||||
|
||||
# 进程级 Tags(用于标记服务实例信息等)
|
||||
[Tracing.Tags]
|
||||
version = "1.0.0"
|
||||
zone = "az1"
|
||||
```
|
||||
|
||||
> 以上字段均有默认值,未配置时保持兼容:
|
||||
> - Sampler: const=1
|
||||
> - Reporter: LogSpans=true, BufferFlushMs=100, QueueSize=1000
|
||||
> - ZipkinSharedRPCSpan: 默认 true(与原实现一致)
|
||||
|
||||
## 在应用中启用
|
||||
|
||||
- 通过默认 Provider 注册:
|
||||
|
||||
```go
|
||||
import (
|
||||
"test/providers/tracing"
|
||||
"go.ipao.vip/atom/container"
|
||||
)
|
||||
|
||||
func providers() container.Providers {
|
||||
return container.Providers{
|
||||
tracing.DefaultProvider(),
|
||||
// ... 其他 providers
|
||||
}
|
||||
}
|
||||
```
|
||||
|
||||
- 也可直接调用 Provide:
|
||||
|
||||
```go
|
||||
tracing.Provide(/* 可选 opt.Prefix("Tracing") 等 */)
|
||||
```
|
||||
|
||||
## 启动 Tracing 服务
|
||||
|
||||
以下示例以 Jaeger 为主(与本 Provider 默认兼容)。本地开发推荐使用 all-in-one 容器;生产建议使用独立的 Agent/Collector/Storage 组件。
|
||||
|
||||
### 方案 A:Jaeger All-in-One(本地最快)
|
||||
|
||||
```bash
|
||||
docker run --rm -it \
|
||||
-p 16686:16686 \ # Web UI
|
||||
-p 14268:14268 \ # Collector HTTP
|
||||
-p 6831:6831/udp \ # Agent UDP (thrift compact)
|
||||
--name jaeger \
|
||||
jaegertracing/all-in-one:1.57
|
||||
```
|
||||
|
||||
对应配置(config.toml):
|
||||
|
||||
```toml
|
||||
[Tracing]
|
||||
Reporter_LocalAgentHostPort = "127.0.0.1:6831"
|
||||
Reporter_CollectorEndpoint = "http://127.0.0.1:14268/api/traces"
|
||||
```
|
||||
|
||||
打开 UI: http://localhost:16686
|
||||
|
||||
### 方案 B:Jaeger Agent + Collector(推荐用于集成环境)
|
||||
|
||||
docker-compose 示例(精简):
|
||||
|
||||
```yaml
|
||||
services:
|
||||
jaeger-agent:
|
||||
image: jaegertracing/jaeger-agent:1.57
|
||||
command: ["--reporter.grpc.host-port=jaeger-collector:14250"]
|
||||
ports:
|
||||
- "6831:6831/udp"
|
||||
depends_on: [jaeger-collector]
|
||||
|
||||
jaeger-collector:
|
||||
image: jaegertracing/jaeger-collector:1.57
|
||||
environment:
|
||||
- SPAN_STORAGE_TYPE=memory
|
||||
ports:
|
||||
- "14268:14268" # HTTP collector
|
||||
- "14250:14250" # gRPC collector
|
||||
depends_on: []
|
||||
|
||||
jaeger-ui:
|
||||
image: jaegertracing/jaeger-query:1.57
|
||||
environment:
|
||||
- SPAN_STORAGE_TYPE=memory
|
||||
- JAEGER_QUERY_BASE_PATH=/
|
||||
ports:
|
||||
- "16686:16686"
|
||||
depends_on: [jaeger-collector]
|
||||
```
|
||||
|
||||
应用侧配置与方案 A 相同:
|
||||
|
||||
```toml
|
||||
[Tracing]
|
||||
Reporter_LocalAgentHostPort = "127.0.0.1:6831"
|
||||
Reporter_CollectorEndpoint = "http://127.0.0.1:14268/api/traces"
|
||||
```
|
||||
|
||||
> 说明:客户端优先将 span 发送到本地 Agent(UDP 6831),Agent 再转发至 Collector;也可直接走 Collector HTTP(14268)。
|
||||
|
||||
## 获取与使用 Tracer
|
||||
|
||||
Provider 初始化后会设置全局 Tracer。推荐在业务中遵循以下范式使用:
|
||||
|
||||
### 1) 在入口处创建/提取根 Span
|
||||
|
||||
```go
|
||||
import (
|
||||
opentracing "github.com/opentracing/opentracing-go"
|
||||
)
|
||||
|
||||
func handler(ctx context.Context) error {
|
||||
// 如果上游已经传入了 Trace Context,则这里会基于其创建子 Span
|
||||
span, ctx := opentracing.StartSpanFromContext(ctx, "handler")
|
||||
defer span.Finish()
|
||||
|
||||
// 记录字段/日志
|
||||
span.SetTag("component", "http")
|
||||
span.LogKV("event", "start")
|
||||
|
||||
// ... 调用子流程
|
||||
if err := doWork(ctx); err != nil {
|
||||
span.SetTag("error", true)
|
||||
span.LogKV("event", "error", "error.object", err)
|
||||
return err
|
||||
}
|
||||
span.LogKV("event", "done")
|
||||
return nil
|
||||
}
|
||||
```
|
||||
|
||||
### 2) 在子流程中创建子 Span(Context 贯穿)
|
||||
|
||||
```go
|
||||
func doWork(ctx context.Context) error {
|
||||
span, ctx := opentracing.StartSpanFromContext(ctx, "doWork")
|
||||
defer span.Finish()
|
||||
|
||||
// 业务标签
|
||||
span.SetTag("db.table", "users")
|
||||
// ...
|
||||
return nil
|
||||
}
|
||||
```
|
||||
|
||||
### 3) 在出站 HTTP 请求中注入 Trace Context
|
||||
|
||||
结合 req Provider(或标准 http.Client)进行 Inject:
|
||||
|
||||
```go
|
||||
import (
|
||||
"net/http"
|
||||
opentracing "github.com/opentracing/opentracing-go"
|
||||
"github.com/opentracing/opentracing-go/ext"
|
||||
)
|
||||
|
||||
func callDownstream(ctx context.Context, c *Client) error {
|
||||
// 创建 client-span
|
||||
span, _ := opentracing.StartSpanFromContext(ctx, "downstream.call")
|
||||
defer span.Finish()
|
||||
ext.SpanKindRPCClient.Set(span)
|
||||
span.SetTag("peer.service", "downstream-svc")
|
||||
|
||||
// 使用 req 客户端发起请求(支持 BaseURL)
|
||||
r := c.RWithCtx(ctx)
|
||||
// 手动注入(可选):
|
||||
// headers := http.Header{}
|
||||
// _ = opentracing.GlobalTracer().Inject(span.Context(), opentracing.HTTPHeaders, opentracing.HTTPHeadersCarrier(headers))
|
||||
// r.SetHeaders(map[string]string{"traceparent": headers.Get("traceparent")}) // 视使用的传播格式而定
|
||||
|
||||
resp, err := r.Get("/api/ping")
|
||||
if err != nil {
|
||||
span.SetTag("error", true)
|
||||
span.LogKV("event", "error", "error.object", err)
|
||||
return err
|
||||
}
|
||||
_ = resp // 使用响应
|
||||
return nil
|
||||
}
|
||||
```
|
||||
|
||||
使用标准 http.Client:
|
||||
|
||||
```go
|
||||
func injectHTTP(ctx context.Context, req *http.Request) {
|
||||
span := opentracing.SpanFromContext(ctx)
|
||||
if span == nil { return }
|
||||
_ = opentracing.GlobalTracer().Inject(
|
||||
span.Context(),
|
||||
opentracing.HTTPHeaders,
|
||||
opentracing.HTTPHeadersCarrier(req.Header),
|
||||
)
|
||||
}
|
||||
```
|
||||
|
||||
### 4) 在入站 HTTP/gRPC 中提取 Trace Context
|
||||
|
||||
- HTTP(示例):
|
||||
|
||||
```go
|
||||
func extractHTTP(r *http.Request) context.Context {
|
||||
wireContext, _ := opentracing.GlobalTracer().Extract(
|
||||
opentracing.HTTPHeaders,
|
||||
opentracing.HTTPHeadersCarrier(r.Header),
|
||||
)
|
||||
var span opentracing.Span
|
||||
if wireContext != nil {
|
||||
span = opentracing.StartSpan("http.server", ext.RPCServerOption(wireContext))
|
||||
} else {
|
||||
span = opentracing.StartSpan("http.server")
|
||||
}
|
||||
return opentracing.ContextWithSpan(r.Context(), span)
|
||||
}
|
||||
```
|
||||
|
||||
- gRPC:建议在 providers/grpc 中配置拦截器/StatsHandler,实现自动 Extract/Inject,无需在业务中手写(参见 `providers/grpc/options.md`)。
|
||||
|
||||
### 5) Baggage(跨服务携带的键值)
|
||||
|
||||
```go
|
||||
span, ctx := opentracing.StartSpanFromContext(ctx, "op")
|
||||
defer span.Finish()
|
||||
span.SetBaggageItem("tenant_id", "t-001")
|
||||
// 下游可通过 SpanContext.BaggageItem("tenant_id") 获取
|
||||
```
|
||||
|
||||
### 6) 通过 DI 获取 Tracer / Closer
|
||||
|
||||
虽然已设置为全局 Tracer,但也可以注入使用:
|
||||
|
||||
```go
|
||||
type deps struct {
|
||||
dig.In
|
||||
Tracer opentracing.Tracer
|
||||
}
|
||||
|
||||
func useTracer(d deps) {
|
||||
span := d.Tracer.StartSpan("manual-span")
|
||||
defer span.Finish()
|
||||
}
|
||||
```
|
||||
|
||||
> Provider 已注册优雅关停钩子,进程退出时会自动调用 `closer.Close()`;如需手动关闭,可同时注入 `io.Closer`。
|
||||
|
||||
## 与 HTTP/gRPC 集成建议
|
||||
|
||||
- gRPC:可在 gRPC Provider 侧加入拦截器(或 StatsHandler)实现自动注入/传播上下文。
|
||||
- HTTP(Fiber):中间件中从 Header/Context 读取 Trace/Span,创建子 Span 并写回响应 Header。
|
||||
|
||||
> 本仓库示例已在 gRPC Provider 中支持拦截器聚合;你可以在 providers/grpc/options.md 查看如何添加 tracing 拦截器或 StatsHandler。
|
||||
|
||||
附加建议:
|
||||
- 统一行为:在 gRPC 使用 StatsHandler 或拦截器方案,在 HTTP 使用中间件方案,尽量保证上下游行为一致。
|
||||
- 采样策略:生产建议通过 remote/概率采样调优,热点接口可以在业务中加额外标记(如错误时强制采样)。
|
||||
- 与 OTel 协作:如需要与 OpenTelemetry 共存/迁移,可在 gRPC 端先使用 OTel StatsHandler,同时保留本 Provider 负责 Jaeger 上报。
|
||||
|
||||
## 调试与排障
|
||||
|
||||
- 未上报:确认 `Reporter_LocalAgentHostPort` 或 `Reporter_CollectorEndpoint` 可达。
|
||||
- 采样未命中:检查 `Sampler_Type`/`Sampler_Param` 是否符合预期。
|
||||
- ID 长度:如需与其他系统对齐,开启 `Gen128Bit`。
|
||||
- 关闭追踪:将 `Disabled=true`,provider 将不初始化 tracer。
|
||||
|
||||
## 变更说明
|
||||
|
||||
- 使用 JSON 日志格式,便于结构化采集。
|
||||
- 新增高级配置开关(采样器、Reporter、Tags、128-bit、禁用等),并保持默认行为兼容。
|
||||
- 注册优雅关停钩子,确保进程退出前 flush/关闭。
|
||||
@@ -34,6 +34,25 @@ type Config struct {
|
||||
Name string
|
||||
Reporter_LocalAgentHostPort string //: "127.0.0.1:6831",
|
||||
Reporter_CollectorEndpoint string //: "http://127.0.0.1:14268/api/traces",
|
||||
Disabled bool
|
||||
Gen128Bit bool
|
||||
ZipkinSharedRPCSpan bool
|
||||
RPCMetrics bool
|
||||
|
||||
// Sampler configuration
|
||||
Sampler_Type string
|
||||
Sampler_Param float64
|
||||
Sampler_SamplingServerURL string
|
||||
Sampler_MaxOperations int
|
||||
Sampler_RefreshIntervalSec uint
|
||||
|
||||
// Reporter configuration
|
||||
Reporter_LogSpans *bool
|
||||
Reporter_BufferFlushMs uint
|
||||
Reporter_QueueSize int
|
||||
|
||||
// Process tags
|
||||
Tags map[string]string
|
||||
}
|
||||
|
||||
func (c *Config) format() {
|
||||
@@ -48,4 +67,21 @@ func (c *Config) format() {
|
||||
if c.Name == "" {
|
||||
c.Name = "default"
|
||||
}
|
||||
|
||||
if c.Sampler_Type == "" {
|
||||
c.Sampler_Type = "const"
|
||||
}
|
||||
if c.Sampler_Param == 0 {
|
||||
c.Sampler_Param = 1
|
||||
}
|
||||
if c.Reporter_BufferFlushMs == 0 {
|
||||
c.Reporter_BufferFlushMs = 100
|
||||
}
|
||||
if c.Reporter_QueueSize == 0 {
|
||||
c.Reporter_QueueSize = 1000
|
||||
}
|
||||
if c.Reporter_LogSpans == nil {
|
||||
b := true
|
||||
c.Reporter_LogSpans = &b
|
||||
}
|
||||
}
|
||||
|
||||
@@ -1,11 +1,9 @@
|
||||
package tracing
|
||||
|
||||
import (
|
||||
"io"
|
||||
"time"
|
||||
|
||||
opentracing "github.com/opentracing/opentracing-go"
|
||||
"github.com/pkg/errors"
|
||||
"github.com/sirupsen/logrus"
|
||||
config "github.com/uber/jaeger-client-go/config"
|
||||
"go.ipao.vip/atom/container"
|
||||
@@ -19,39 +17,59 @@ func Provide(opts ...opt.Option) error {
|
||||
return err
|
||||
}
|
||||
conf.format()
|
||||
return container.Container.Provide(func() (opentracing.Tracer, io.Closer, error) {
|
||||
return container.Container.Provide(func() (opentracing.Tracer, error) {
|
||||
log := logrus.New()
|
||||
log.SetFormatter(&logrus.TextFormatter{
|
||||
FullTimestamp: true,
|
||||
TimestampFormat: "2006-01-02 15:04:05",
|
||||
})
|
||||
log.SetFormatter(&logrus.JSONFormatter{TimestampFormat: time.RFC3339})
|
||||
|
||||
cfg := &config.Configuration{
|
||||
ServiceName: conf.Name,
|
||||
Disabled: conf.Disabled,
|
||||
Sampler: &config.SamplerConfig{
|
||||
Type: "const",
|
||||
Param: 1,
|
||||
Type: conf.Sampler_Type,
|
||||
Param: conf.Sampler_Param,
|
||||
SamplingServerURL: conf.Sampler_SamplingServerURL,
|
||||
MaxOperations: conf.Sampler_MaxOperations,
|
||||
SamplingRefreshInterval: time.Duration(conf.Sampler_RefreshIntervalSec) * time.Second,
|
||||
},
|
||||
Reporter: &config.ReporterConfig{
|
||||
LogSpans: true,
|
||||
LogSpans: func() bool {
|
||||
if conf.Reporter_LogSpans == nil {
|
||||
return true
|
||||
}
|
||||
return *conf.Reporter_LogSpans
|
||||
}(),
|
||||
LocalAgentHostPort: conf.Reporter_LocalAgentHostPort,
|
||||
CollectorEndpoint: conf.Reporter_CollectorEndpoint,
|
||||
BufferFlushInterval: 100 * time.Millisecond,
|
||||
QueueSize: 1000,
|
||||
BufferFlushInterval: time.Duration(conf.Reporter_BufferFlushMs) * time.Millisecond,
|
||||
QueueSize: conf.Reporter_QueueSize,
|
||||
},
|
||||
RPCMetrics: conf.RPCMetrics,
|
||||
Gen128Bit: conf.Gen128Bit,
|
||||
}
|
||||
|
||||
if len(conf.Tags) > 0 {
|
||||
cfg.Tags = make([]opentracing.Tag, 0, len(conf.Tags))
|
||||
for k, v := range conf.Tags {
|
||||
cfg.Tags = append(cfg.Tags, opentracing.Tag{Key: k, Value: v})
|
||||
}
|
||||
}
|
||||
|
||||
// 使用自定义的 logger
|
||||
jLogger := &jaegerLogrus{logger: log}
|
||||
zipkinShared := conf.ZipkinSharedRPCSpan
|
||||
if !zipkinShared {
|
||||
zipkinShared = true
|
||||
}
|
||||
tracer, closer, err := cfg.NewTracer(
|
||||
config.Logger(jLogger),
|
||||
config.ZipkinSharedRPCSpan(true),
|
||||
config.ZipkinSharedRPCSpan(zipkinShared),
|
||||
)
|
||||
if err != nil {
|
||||
return nil, nil, errors.Wrapf(err, "无法初始化 Jaeger: %v", err)
|
||||
log.Errorf("无法初始化 Jaeger: %v", err)
|
||||
return opentracing.NoopTracer{}, nil
|
||||
}
|
||||
opentracing.SetGlobalTracer(tracer)
|
||||
container.AddCloseAble(func() { _ = closer.Close() })
|
||||
|
||||
return tracer, closer, nil
|
||||
return tracer, nil
|
||||
}, o.DiOptions()...)
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user