Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions options.go
Original file line number Diff line number Diff line change
Expand Up @@ -159,3 +159,9 @@ func WithMinimumReadDBStatsInterval(interval time.Duration) StatsOption {
o.minimumReadDBStatsInterval = interval
})
}

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
// WithSkipBatchQueries will disable tracing individuals batch queries, only tracing the batch itself

func WithSkipBatchQueries() Option {
return optionFunc(func(cfg *tracerConfig) {
cfg.skipBatchQueries = true
})
}
53 changes: 26 additions & 27 deletions tracer.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,10 +24,14 @@ import (
const (
tracerName = "github.com/exaring/otelpgx"
meterName = "github.com/exaring/otelpgx"
startTimeCtxKey = "otelpgxStartTime"
sqlOperationUnknown = "UNKNOWN"
)

type (
startTimeCtxKey struct{}
batchMarkerCtxKey struct{}
)

const (
pgxOperationQuery = "query"
pgxOperationCopy = "copy"
Expand Down Expand Up @@ -58,20 +62,13 @@ var _ pgxpool.AcquireTracer = (*Tracer)(nil)
// Tracer is a wrapper around the pgx tracer interfaces which instrument
// queries with both tracing and metrics.
type Tracer struct {
tracer trace.Tracer
meter metric.Meter
tracerAttrs []attribute.KeyValue
meterAttrs []attribute.KeyValue
tracer trace.Tracer
meter metric.Meter

operationDuration metric.Int64Histogram
operationErrors metric.Int64Counter

trimQuerySpanName bool
spanNameFunc SpanNameFunc
prefixQuerySpanName bool
logSQLStatement bool
logConnectionDetails bool
includeParams bool
*tracerConfig
}

type tracerConfig struct {
Expand All @@ -87,6 +84,7 @@ type tracerConfig struct {
logSQLStatement bool
logConnectionDetails bool
includeParams bool
skipBatchQueries bool // if true, individual batch queries will not be traced, only the batch itself
}

// NewTracer returns a new Tracer.
Expand All @@ -113,15 +111,9 @@ func NewTracer(opts ...Option) *Tracer {
}

tracer := &Tracer{
tracer: cfg.tracerProvider.Tracer(tracerName, trace.WithInstrumentationVersion(findOwnImportedVersion())),
meter: cfg.meterProvider.Meter(meterName, metric.WithInstrumentationVersion(findOwnImportedVersion())),
tracerAttrs: cfg.tracerAttrs,
meterAttrs: cfg.meterAttrs,
trimQuerySpanName: cfg.trimQuerySpanName,
spanNameFunc: cfg.spanNameFunc,
prefixQuerySpanName: cfg.prefixQuerySpanName,
logSQLStatement: cfg.logSQLStatement,
includeParams: cfg.includeParams,
tracer: cfg.tracerProvider.Tracer(tracerName, trace.WithInstrumentationVersion(findOwnImportedVersion())),
meter: cfg.meterProvider.Meter(meterName, metric.WithInstrumentationVersion(findOwnImportedVersion())),
tracerConfig: cfg,
}

tracer.createMetrics()
Expand Down Expand Up @@ -179,7 +171,7 @@ func (t *Tracer) incrementOperationErrorCount(ctx context.Context, err error, pg

// recordOperationDuration will compute and record the time since the start of an operation.
func (t *Tracer) recordOperationDuration(ctx context.Context, pgxOperation string) {
if startTime, ok := ctx.Value(startTimeCtxKey).(time.Time); ok {
if startTime, ok := ctx.Value(startTimeCtxKey{}).(time.Time); ok {
t.operationDuration.Record(ctx, time.Since(startTime).Milliseconds(), metric.WithAttributeSet(
attribute.NewSet(append(t.meterAttrs, PGXOperationTypeKey.String(pgxOperation))...),
))
Expand Down Expand Up @@ -229,7 +221,12 @@ func connectionAttributesFromConfig(config *pgx.ConnConfig) trace.SpanStartOptio
// TraceQueryStart is called at the beginning of Query, QueryRow, and Exec calls.
// The returned context is used for the rest of the call and will be passed to TraceQueryEnd.
func (t *Tracer) TraceQueryStart(ctx context.Context, conn *pgx.Conn, data pgx.TraceQueryStartData) context.Context {
ctx = context.WithValue(ctx, startTimeCtxKey, time.Now())
if skipBatchQueries, ok := ctx.Value(batchMarkerCtxKey{}).(bool); ok && skipBatchQueries && t.skipBatchQueries {
// Short circuit query tracing if we're in a batch and [WithSkipBatchQueries] was used.
return ctx
}

ctx = context.WithValue(ctx, startTimeCtxKey{}, time.Now())

if !trace.SpanFromContext(ctx).IsRecording() {
return ctx
Expand Down Expand Up @@ -289,7 +286,7 @@ func (t *Tracer) TraceQueryEnd(ctx context.Context, _ *pgx.Conn, data pgx.TraceQ
// returned context is used for the rest of the call and will be passed to
// TraceCopyFromEnd.
func (t *Tracer) TraceCopyFromStart(ctx context.Context, conn *pgx.Conn, data pgx.TraceCopyFromStartData) context.Context {
ctx = context.WithValue(ctx, startTimeCtxKey, time.Now())
ctx = context.WithValue(ctx, startTimeCtxKey{}, time.Now())

if !trace.SpanFromContext(ctx).IsRecording() {
return ctx
Expand Down Expand Up @@ -330,7 +327,9 @@ func (t *Tracer) TraceCopyFromEnd(ctx context.Context, _ *pgx.Conn, data pgx.Tra
// context is used for the rest of the call and will be passed to
// TraceBatchQuery and TraceBatchEnd.
func (t *Tracer) TraceBatchStart(ctx context.Context, conn *pgx.Conn, data pgx.TraceBatchStartData) context.Context {
ctx = context.WithValue(ctx, startTimeCtxKey, time.Now())
ctx = context.WithValue(ctx, startTimeCtxKey{}, time.Now())

ctx = context.WithValue(ctx, batchMarkerCtxKey{}, true)

if !trace.SpanFromContext(ctx).IsRecording() {
return ctx
Expand Down Expand Up @@ -420,7 +419,7 @@ func (t *Tracer) TraceBatchEnd(ctx context.Context, _ *pgx.Conn, data pgx.TraceB
// calls. The returned context is used for the rest of the call and will be
// passed to TraceConnectEnd.
func (t *Tracer) TraceConnectStart(ctx context.Context, data pgx.TraceConnectStartData) context.Context {
ctx = context.WithValue(ctx, startTimeCtxKey, time.Now())
ctx = context.WithValue(ctx, startTimeCtxKey{}, time.Now())

if !trace.SpanFromContext(ctx).IsRecording() {
return ctx
Expand Down Expand Up @@ -456,7 +455,7 @@ func (t *Tracer) TraceConnectEnd(ctx context.Context, data pgx.TraceConnectEndDa
// context is used for the rest of the call and will be passed to
// TracePrepareEnd.
func (t *Tracer) TracePrepareStart(ctx context.Context, conn *pgx.Conn, data pgx.TracePrepareStartData) context.Context {
ctx = context.WithValue(ctx, startTimeCtxKey, time.Now())
ctx = context.WithValue(ctx, startTimeCtxKey{}, time.Now())

if !trace.SpanFromContext(ctx).IsRecording() {
return ctx
Expand Down Expand Up @@ -509,7 +508,7 @@ func (t *Tracer) TracePrepareEnd(ctx context.Context, _ *pgx.Conn, data pgx.Trac
// TraceAcquireStart is called at the beginning of Acquire.
// The returned context is used for the rest of the call and will be passed to the TraceAcquireEnd.
func (t *Tracer) TraceAcquireStart(ctx context.Context, pool *pgxpool.Pool, data pgxpool.TraceAcquireStartData) context.Context {
ctx = context.WithValue(ctx, startTimeCtxKey, time.Now())
ctx = context.WithValue(ctx, startTimeCtxKey{}, time.Now())

if !trace.SpanFromContext(ctx).IsRecording() {
return ctx
Expand Down