Compare commits

...

2 Commits

5 changed files with 440 additions and 33 deletions

View File

@@ -17,7 +17,19 @@ func DefaultProvider() container.ProviderContainer {
}
}
type Config struct{}
type Config struct {
// Optional per-queue worker concurrency. If empty, defaults apply.
QueueWorkers QueueWorkersConfig
}
// QueueWorkers allows configuring worker concurrency per queue.
// Key is the queue name, value is MaxWorkers. If empty, defaults are used.
// Example TOML:
//
// [Job]
// # high=20, default=10, low=5
// # QueueWorkers = { high = 20, default = 10, low = 5 }
type QueueWorkersConfig map[string]int
const (
PriorityDefault = river.PriorityDefault
@@ -31,3 +43,25 @@ const (
QueueDefault = river.QueueDefault
QueueLow = "low"
)
// queueConfig returns a river.QueueConfig map built from QueueWorkers or defaults.
func (c *Config) queueConfig() map[string]river.QueueConfig {
cfg := map[string]river.QueueConfig{}
if c == nil || len(c.QueueWorkers) == 0 {
cfg[QueueHigh] = river.QueueConfig{MaxWorkers: 10}
cfg[QueueDefault] = river.QueueConfig{MaxWorkers: 10}
cfg[QueueLow] = river.QueueConfig{MaxWorkers: 10}
return cfg
}
for name, n := range c.QueueWorkers {
if n <= 0 {
n = 1
}
cfg[name] = river.QueueConfig{MaxWorkers: n}
}
if _, ok := cfg[QueueDefault]; !ok {
cfg[QueueDefault] = river.QueueConfig{MaxWorkers: 10}
}
return cfg
}

View File

@@ -2,9 +2,11 @@ package job
import (
"context"
"fmt"
"sync"
"time"
"{{.ModuleName}}/providers/postgres"
"test/providers/postgres"
"github.com/jackc/pgx/v5"
"github.com/jackc/pgx/v5/pgxpool"
@@ -37,10 +39,22 @@ func Provide(opts ...opt.Option) error {
if err != nil {
return nil, err
}
// health check ping with timeout
pingCtx, cancel := context.WithTimeout(ctx, 3*time.Second)
defer cancel()
if err := dbPool.Ping(pingCtx); err != nil {
return nil, fmt.Errorf("job provider: db ping failed: %w", err)
}
container.AddCloseAble(dbPool.Close)
pool := riverpgxv5.New(dbPool)
queue := &Job{Workers: workers, driver: pool, ctx: ctx, periodicJobs: make(map[string]rivertype.PeriodicJobHandle), jobs: make(map[string]*rivertype.JobInsertResult)}
queue := &Job{
Workers: workers,
driver: pool,
ctx: ctx,
conf: &config,
periodicJobs: make(map[string]rivertype.PeriodicJobHandle),
}
container.AddCloseAble(queue.Close)
return queue, nil
@@ -49,6 +63,7 @@ func Provide(opts ...opt.Option) error {
type Job struct {
ctx context.Context
conf *Config
Workers *river.Workers
driver *riverpgxv5.Driver
@@ -56,7 +71,6 @@ type Job struct {
client *river.Client[pgx.Tx]
periodicJobs map[string]rivertype.PeriodicJobHandle
jobs map[string]*rivertype.JobInsertResult
}
func (q *Job) Close() {
@@ -67,6 +81,10 @@ func (q *Job) Close() {
if err := q.client.StopAndCancel(q.ctx); err != nil {
log.Errorf("Failed to stop and cancel client: %s", err)
}
// clear references
q.l.Lock()
q.periodicJobs = map[string]rivertype.PeriodicJobHandle{}
q.l.Unlock()
}
func (q *Job) Client() (*river.Client[pgx.Tx], error) {
@@ -77,11 +95,7 @@ func (q *Job) Client() (*river.Client[pgx.Tx], error) {
var err error
q.client, err = river.NewClient(q.driver, &river.Config{
Workers: q.Workers,
Queues: map[string]river.QueueConfig{
QueueHigh: {MaxWorkers: 10},
QueueDefault: {MaxWorkers: 10},
QueueLow: {MaxWorkers: 10},
},
Queues: q.conf.queueConfig(),
})
if err != nil {
return nil, err
@@ -158,15 +172,21 @@ func (q *Job) Cancel(id string) error {
if h, ok := q.periodicJobs[id]; ok {
client.PeriodicJobs().Remove(h)
delete(q.periodicJobs, id)
return nil
}
return nil
}
if r, ok := q.jobs[id]; ok {
_, err = client.JobCancel(q.ctx, r.Job.ID)
if err != nil {
return err
}
delete(q.jobs, id)
// CancelContext is like Cancel but allows passing a context.
func (q *Job) CancelContext(ctx context.Context, id string) error {
client, err := q.Client()
if err != nil {
return err
}
q.l.Lock()
defer q.l.Unlock()
if h, ok := q.periodicJobs[id]; ok {
client.PeriodicJobs().Remove(h)
delete(q.periodicJobs, id)
return nil
}
@@ -182,6 +202,6 @@ func (q *Job) Add(job contracts.JobArgs) error {
q.l.Lock()
defer q.l.Unlock()
q.jobs[job.UniqueID()], err = client.Insert(q.ctx, job, lo.ToPtr(job.InsertOpts()))
_, err = client.Insert(q.ctx, job, lo.ToPtr(job.InsertOpts()))
return err
}

View File

@@ -0,0 +1,299 @@
# Tracing Provider (Jaeger / OpenTracing)
该 Provider 使用 jaeger-client-go + OpenTracing 构建分布式追踪能力,可通过本地 Agent 或 Collector 上报。
## 配置项config.toml
```toml
[Tracing]
# 基础
Name = "my-service"
Disabled = false # 置为 true 可禁用追踪(不初始化 tracer
Gen128Bit = true # 生成 128-bit trace id
ZipkinSharedRPCSpan = true # 与 Zipkin 共享 RPC span 模式(与旧行为一致)
RPCMetrics = false # 启用 RPC metricsJaeger 自带)
# 采样器const|probabilistic|ratelimiting|remote
Sampler_Type = "const"
Sampler_Param = 1.0 # const:1=全采probabilistic:概率ratelimiting:每秒速率
Sampler_SamplingServerURL = "" # remote 采样器服务地址
Sampler_MaxOperations = 0
Sampler_RefreshIntervalSec = 0
# Reporter二选一或同时配置Jaeger 会择优使用)
Reporter_LocalAgentHostPort = "127.0.0.1:6831" # UDP Agent
Reporter_CollectorEndpoint = "http://127.0.0.1:14268/api/traces" # HTTP Collector
Reporter_LogSpans = true
Reporter_BufferFlushMs = 100
Reporter_QueueSize = 1000
# 进程级 Tags用于标记服务实例信息等
[Tracing.Tags]
version = "1.0.0"
zone = "az1"
```
> 以上字段均有默认值,未配置时保持兼容:
> - Sampler: const=1
> - Reporter: LogSpans=true, BufferFlushMs=100, QueueSize=1000
> - ZipkinSharedRPCSpan: 默认 true与原实现一致
## 在应用中启用
- 通过默认 Provider 注册:
```go
import (
"test/providers/tracing"
"go.ipao.vip/atom/container"
)
func providers() container.Providers {
return container.Providers{
tracing.DefaultProvider(),
// ... 其他 providers
}
}
```
- 也可直接调用 Provide
```go
tracing.Provide(/* 可选 opt.Prefix("Tracing") 等 */)
```
## 启动 Tracing 服务
以下示例以 Jaeger 为主(与本 Provider 默认兼容)。本地开发推荐使用 all-in-one 容器;生产建议使用独立的 Agent/Collector/Storage 组件。
### 方案 AJaeger All-in-One本地最快
```bash
docker run --rm -it \
-p 16686:16686 \ # Web UI
-p 14268:14268 \ # Collector HTTP
-p 6831:6831/udp \ # Agent UDP (thrift compact)
--name jaeger \
jaegertracing/all-in-one:1.57
```
对应配置config.toml
```toml
[Tracing]
Reporter_LocalAgentHostPort = "127.0.0.1:6831"
Reporter_CollectorEndpoint = "http://127.0.0.1:14268/api/traces"
```
打开 UI: http://localhost:16686
### 方案 BJaeger Agent + Collector推荐用于集成环境
docker-compose 示例(精简):
```yaml
services:
jaeger-agent:
image: jaegertracing/jaeger-agent:1.57
command: ["--reporter.grpc.host-port=jaeger-collector:14250"]
ports:
- "6831:6831/udp"
depends_on: [jaeger-collector]
jaeger-collector:
image: jaegertracing/jaeger-collector:1.57
environment:
- SPAN_STORAGE_TYPE=memory
ports:
- "14268:14268" # HTTP collector
- "14250:14250" # gRPC collector
depends_on: []
jaeger-ui:
image: jaegertracing/jaeger-query:1.57
environment:
- SPAN_STORAGE_TYPE=memory
- JAEGER_QUERY_BASE_PATH=/
ports:
- "16686:16686"
depends_on: [jaeger-collector]
```
应用侧配置与方案 A 相同:
```toml
[Tracing]
Reporter_LocalAgentHostPort = "127.0.0.1:6831"
Reporter_CollectorEndpoint = "http://127.0.0.1:14268/api/traces"
```
> 说明:客户端优先将 span 发送到本地 AgentUDP 6831Agent 再转发至 Collector也可直接走 Collector HTTP14268
## 获取与使用 Tracer
Provider 初始化后会设置全局 Tracer。推荐在业务中遵循以下范式使用
### 1) 在入口处创建/提取根 Span
```go
import (
opentracing "github.com/opentracing/opentracing-go"
)
func handler(ctx context.Context) error {
// 如果上游已经传入了 Trace Context则这里会基于其创建子 Span
span, ctx := opentracing.StartSpanFromContext(ctx, "handler")
defer span.Finish()
// 记录字段/日志
span.SetTag("component", "http")
span.LogKV("event", "start")
// ... 调用子流程
if err := doWork(ctx); err != nil {
span.SetTag("error", true)
span.LogKV("event", "error", "error.object", err)
return err
}
span.LogKV("event", "done")
return nil
}
```
### 2) 在子流程中创建子 SpanContext 贯穿)
```go
func doWork(ctx context.Context) error {
span, ctx := opentracing.StartSpanFromContext(ctx, "doWork")
defer span.Finish()
// 业务标签
span.SetTag("db.table", "users")
// ...
return nil
}
```
### 3) 在出站 HTTP 请求中注入 Trace Context
结合 req Provider或标准 http.Client进行 Inject
```go
import (
"net/http"
opentracing "github.com/opentracing/opentracing-go"
"github.com/opentracing/opentracing-go/ext"
)
func callDownstream(ctx context.Context, c *Client) error {
// 创建 client-span
span, _ := opentracing.StartSpanFromContext(ctx, "downstream.call")
defer span.Finish()
ext.SpanKindRPCClient.Set(span)
span.SetTag("peer.service", "downstream-svc")
// 使用 req 客户端发起请求(支持 BaseURL
r := c.RWithCtx(ctx)
// 手动注入(可选):
// headers := http.Header{}
// _ = opentracing.GlobalTracer().Inject(span.Context(), opentracing.HTTPHeaders, opentracing.HTTPHeadersCarrier(headers))
// r.SetHeaders(map[string]string{"traceparent": headers.Get("traceparent")}) // 视使用的传播格式而定
resp, err := r.Get("/api/ping")
if err != nil {
span.SetTag("error", true)
span.LogKV("event", "error", "error.object", err)
return err
}
_ = resp // 使用响应
return nil
}
```
使用标准 http.Client
```go
func injectHTTP(ctx context.Context, req *http.Request) {
span := opentracing.SpanFromContext(ctx)
if span == nil { return }
_ = opentracing.GlobalTracer().Inject(
span.Context(),
opentracing.HTTPHeaders,
opentracing.HTTPHeadersCarrier(req.Header),
)
}
```
### 4) 在入站 HTTP/gRPC 中提取 Trace Context
- HTTP示例
```go
func extractHTTP(r *http.Request) context.Context {
wireContext, _ := opentracing.GlobalTracer().Extract(
opentracing.HTTPHeaders,
opentracing.HTTPHeadersCarrier(r.Header),
)
var span opentracing.Span
if wireContext != nil {
span = opentracing.StartSpan("http.server", ext.RPCServerOption(wireContext))
} else {
span = opentracing.StartSpan("http.server")
}
return opentracing.ContextWithSpan(r.Context(), span)
}
```
- gRPC建议在 providers/grpc 中配置拦截器/StatsHandler实现自动 Extract/Inject无需在业务中手写参见 `providers/grpc/options.md`)。
### 5) Baggage跨服务携带的键值
```go
span, ctx := opentracing.StartSpanFromContext(ctx, "op")
defer span.Finish()
span.SetBaggageItem("tenant_id", "t-001")
// 下游可通过 SpanContext.BaggageItem("tenant_id") 获取
```
### 6) 通过 DI 获取 Tracer / Closer
虽然已设置为全局 Tracer但也可以注入使用
```go
type deps struct {
dig.In
Tracer opentracing.Tracer
}
func useTracer(d deps) {
span := d.Tracer.StartSpan("manual-span")
defer span.Finish()
}
```
> Provider 已注册优雅关停钩子,进程退出时会自动调用 `closer.Close()`;如需手动关闭,可同时注入 `io.Closer`。
## 与 HTTP/gRPC 集成建议
- gRPC可在 gRPC Provider 侧加入拦截器(或 StatsHandler实现自动注入/传播上下文。
- HTTPFiber中间件中从 Header/Context 读取 Trace/Span创建子 Span 并写回响应 Header。
> 本仓库示例已在 gRPC Provider 中支持拦截器聚合;你可以在 providers/grpc/options.md 查看如何添加 tracing 拦截器或 StatsHandler。
附加建议:
- 统一行为:在 gRPC 使用 StatsHandler 或拦截器方案,在 HTTP 使用中间件方案,尽量保证上下游行为一致。
- 采样策略:生产建议通过 remote/概率采样调优,热点接口可以在业务中加额外标记(如错误时强制采样)。
- 与 OTel 协作:如需要与 OpenTelemetry 共存/迁移,可在 gRPC 端先使用 OTel StatsHandler同时保留本 Provider 负责 Jaeger 上报。
## 调试与排障
- 未上报:确认 `Reporter_LocalAgentHostPort` 或 `Reporter_CollectorEndpoint` 可达。
- 采样未命中:检查 `Sampler_Type`/`Sampler_Param` 是否符合预期。
- ID 长度:如需与其他系统对齐,开启 `Gen128Bit`。
- 关闭追踪:将 `Disabled=true`provider 将不初始化 tracer。
## 变更说明
- 使用 JSON 日志格式,便于结构化采集。
- 新增高级配置开关采样器、Reporter、Tags、128-bit、禁用等并保持默认行为兼容。
- 注册优雅关停钩子,确保进程退出前 flush/关闭。

View File

@@ -34,6 +34,25 @@ type Config struct {
Name string
Reporter_LocalAgentHostPort string //: "127.0.0.1:6831",
Reporter_CollectorEndpoint string //: "http://127.0.0.1:14268/api/traces",
Disabled bool
Gen128Bit bool
ZipkinSharedRPCSpan bool
RPCMetrics bool
// Sampler configuration
Sampler_Type string
Sampler_Param float64
Sampler_SamplingServerURL string
Sampler_MaxOperations int
Sampler_RefreshIntervalSec uint
// Reporter configuration
Reporter_LogSpans *bool
Reporter_BufferFlushMs uint
Reporter_QueueSize int
// Process tags
Tags map[string]string
}
func (c *Config) format() {
@@ -48,4 +67,21 @@ func (c *Config) format() {
if c.Name == "" {
c.Name = "default"
}
if c.Sampler_Type == "" {
c.Sampler_Type = "const"
}
if c.Sampler_Param == 0 {
c.Sampler_Param = 1
}
if c.Reporter_BufferFlushMs == 0 {
c.Reporter_BufferFlushMs = 100
}
if c.Reporter_QueueSize == 0 {
c.Reporter_QueueSize = 1000
}
if c.Reporter_LogSpans == nil {
b := true
c.Reporter_LogSpans = &b
}
}

View File

@@ -1,11 +1,9 @@
package tracing
import (
"io"
"time"
opentracing "github.com/opentracing/opentracing-go"
"github.com/pkg/errors"
"github.com/sirupsen/logrus"
config "github.com/uber/jaeger-client-go/config"
"go.ipao.vip/atom/container"
@@ -19,39 +17,59 @@ func Provide(opts ...opt.Option) error {
return err
}
conf.format()
return container.Container.Provide(func() (opentracing.Tracer, io.Closer, error) {
return container.Container.Provide(func() (opentracing.Tracer, error) {
log := logrus.New()
log.SetFormatter(&logrus.TextFormatter{
FullTimestamp: true,
TimestampFormat: "2006-01-02 15:04:05",
})
log.SetFormatter(&logrus.JSONFormatter{TimestampFormat: time.RFC3339})
cfg := &config.Configuration{
ServiceName: conf.Name,
Disabled: conf.Disabled,
Sampler: &config.SamplerConfig{
Type: "const",
Param: 1,
Type: conf.Sampler_Type,
Param: conf.Sampler_Param,
SamplingServerURL: conf.Sampler_SamplingServerURL,
MaxOperations: conf.Sampler_MaxOperations,
SamplingRefreshInterval: time.Duration(conf.Sampler_RefreshIntervalSec) * time.Second,
},
Reporter: &config.ReporterConfig{
LogSpans: true,
LogSpans: func() bool {
if conf.Reporter_LogSpans == nil {
return true
}
return *conf.Reporter_LogSpans
}(),
LocalAgentHostPort: conf.Reporter_LocalAgentHostPort,
CollectorEndpoint: conf.Reporter_CollectorEndpoint,
BufferFlushInterval: 100 * time.Millisecond,
QueueSize: 1000,
BufferFlushInterval: time.Duration(conf.Reporter_BufferFlushMs) * time.Millisecond,
QueueSize: conf.Reporter_QueueSize,
},
RPCMetrics: conf.RPCMetrics,
Gen128Bit: conf.Gen128Bit,
}
if len(conf.Tags) > 0 {
cfg.Tags = make([]opentracing.Tag, 0, len(conf.Tags))
for k, v := range conf.Tags {
cfg.Tags = append(cfg.Tags, opentracing.Tag{Key: k, Value: v})
}
}
// 使 logger
jLogger := &jaegerLogrus{logger: log}
zipkinShared := conf.ZipkinSharedRPCSpan
if !zipkinShared {
zipkinShared = true
}
tracer, closer, err := cfg.NewTracer(
config.Logger(jLogger),
config.ZipkinSharedRPCSpan(true),
config.ZipkinSharedRPCSpan(zipkinShared),
)
if err != nil {
return nil, nil, errors.Wrapf(err, "无法初始化 Jaeger: %v", err)
log.Errorf("无法初始化 Jaeger: %v", err)
return opentracing.NoopTracer{}, nil
}
opentracing.SetGlobalTracer(tracer)
container.AddCloseAble(func() { _ = closer.Close() })
return tracer, closer, nil
return tracer, nil
}, o.DiOptions()...)
}