diff --git a/.gitignore b/.gitignore index bfe5fa0..0eeb3d7 100644 --- a/.gitignore +++ b/.gitignore @@ -31,3 +31,4 @@ _testmain.go /dogstatsd /datadog/testdata/fuzz +.claude/settings.local.json diff --git a/HISTORY.md b/HISTORY.md index fecb28a..a50d648 100644 --- a/HISTORY.md +++ b/HISTORY.md @@ -1,5 +1,60 @@ # History +### v5.9.0 (February 6, 2026) + +Add full OpenTelemetry OTLP exporter support with official SDK integration. + +**New Feature: OpenTelemetry OTLP Exporter** + +The `otlp` package now includes a production-ready `SDKHandler` that uses the +official OpenTelemetry SDK with comprehensive support for modern observability +requirements: + +- **Dual Transport Support**: Both gRPC and HTTP/Protobuf protocols +- **Environment Variables**: Full support for all standard `OTEL_*` environment + variables including `OTEL_EXPORTER_OTLP_ENDPOINT`, `OTEL_EXPORTER_OTLP_PROTOCOL`, + `OTEL_RESOURCE_ATTRIBUTES`, etc. +- **Automatic Resource Detection**: Built-in support for AWS (EC2, ECS, EKS, Lambda), + GCP (Compute Engine), Azure (VM), Kubernetes, host, and process metadata +- **All Metric Types**: Counter, Gauge, and Histogram with proper semantics +- **Tag Preservation**: Automatic conversion of stats tags to OpenTelemetry attributes +- **Production Ready**: Thread-safe instrument caching, proper context handling, + and comprehensive error handling + +**Usage Example:** + +```go +import ( + "context" + "github.com/segmentio/stats/v5" + "github.com/segmentio/stats/v5/otlp" +) + +// Simple usage with environment variables +handler, err := otlp.NewSDKHandlerFromEnv(ctx) +if err != nil { + log.Fatal(err) +} +defer handler.Shutdown(ctx) +stats.Register(handler) + +// Or with explicit configuration +handler, err := otlp.NewSDKHandler(ctx, otlp.SDKConfig{ + Protocol: otlp.ProtocolGRPC, + Endpoint: "localhost:4317", +}) +``` + +**Implementation Details:** + +- Gauges use `UpDownCounter` with delta calculation to maintain absolute value + semantics (workaround until stable OTel SDK adds Gauge instrument) +- Background context for metric recording to prevent context cancellation issues +- Lock-free reads for instrument lookup in the hot path +- Comprehensive documentation including cloud resource detector examples + +See the [otlp package documentation](./otlp/README.md) for complete details and examples. + ### v5.8.0 (December 15, 2025) When reporting go/stats versions, ensure that any user provided tags are diff --git a/README.md b/README.md index 5680e44..0a5be1a 100644 --- a/README.md +++ b/README.md @@ -121,6 +121,88 @@ func main() { } ``` +## Supported Backends + +The stats package supports multiple metric backends out of the box: + +### OpenTelemetry (OTLP) + +The [github.com/segmentio/stats/v5/otlp](https://pkg.go.dev/github.com/segmentio/stats/v5/otlp) package provides full OpenTelemetry Protocol (OTLP) support using the official OpenTelemetry SDK. + +**Features:** + +- gRPC and HTTP/Protobuf transports +- Full support for OTEL_* environment variables +- Automatic resource detection (cloud, Kubernetes, host, process) +- Production-ready with official OTel SDK exporters + +```go +import ( + "context" + "github.com/segmentio/stats/v5" + "github.com/segmentio/stats/v5/otlp" +) + +func main() { + ctx := context.Background() + + // Using gRPC (recommended) + handler, err := otlp.NewSDKHandler(ctx, otlp.SDKConfig{ + Protocol: otlp.ProtocolGRPC, + Endpoint: "localhost:4317", + }) + if err != nil { + panic(err) + } + defer handler.Shutdown(ctx) + + stats.Register(handler) + defer stats.Flush() + + // Or use environment variables (simplest) + // export OTEL_EXPORTER_OTLP_ENDPOINT=http://localhost:4318 + // export OTEL_EXPORTER_OTLP_PROTOCOL=http/protobuf + handler, err = otlp.NewSDKHandlerFromEnv(ctx) +} +``` + +See the [otlp package documentation](./otlp/README.md) for complete details. + +### Datadog + +The [github.com/segmentio/stats/v5/datadog](https://godoc.org/github.com/segmentio/stats/v5/datadog) package provides support for sending metrics to Datadog via DogStatsD protocol over UDP or Unix Domain Sockets. + +```go +import "github.com/segmentio/stats/v5/datadog" + +stats.Register(datadog.NewClient("localhost:8125")) +``` + +### Prometheus + +The [github.com/segmentio/stats/v5/prometheus](https://godoc.org/github.com/segmentio/stats/v5/prometheus) package exposes an HTTP handler that serves metrics in Prometheus format. + +```go +import ( + "net/http" + "github.com/segmentio/stats/v5/prometheus" +) + +handler := prometheus.NewHandler() +stats.Register(handler) +http.Handle("/metrics", handler) +``` + +### InfluxDB + +The [github.com/segmentio/stats/v5/influxdb](https://godoc.org/github.com/segmentio/stats/v5/influxdb) package sends metrics to InfluxDB using the line protocol over HTTP. + +```go +import "github.com/segmentio/stats/v5/influxdb" + +stats.Register(influxdb.NewClient("http://localhost:8086")) +``` + ### Metrics - [Gauges](https://godoc.org/github.com/segmentio/stats#Gauge) diff --git a/otlp/IMPLEMENTATION_NOTES.md b/otlp/IMPLEMENTATION_NOTES.md new file mode 100644 index 0000000..9afe637 --- /dev/null +++ b/otlp/IMPLEMENTATION_NOTES.md @@ -0,0 +1,312 @@ +# OpenTelemetry SDK Implementation Notes + +This document describes the implementation details and design decisions for the OpenTelemetry OTLP exporter. + +## Overview + +This implementation provides full OpenTelemetry Protocol (OTLP) support using the official OpenTelemetry SDK. It bridges the `stats` library's metric interface to OpenTelemetry's metric API. + +## Architecture + +### Core Components + +1. **SDKHandler** - Main handler implementing `stats.Handler` +2. **Protocol Support** - Both gRPC and HTTP/Protobuf transports +3. **Instrument Management** - Efficient caching of OpenTelemetry instruments +4. **Gauge Value Tracking** - Delta calculation for absolute gauge semantics + +## Design Decisions + +### 1. Gauge Implementation + +**Solution**: Use OpenTelemetry's native `Float64Gauge` instrument for synchronous gauge recording. + +```go +// When stats.Set("metric", 42) is called: +gauge.Record(ctx, 42.0, opts) +``` + +**Why**: The OpenTelemetry SDK now provides native Gauge instruments that directly record instantaneous values. This provides the exact semantics users expect - `stats.Set("metric", 42)` records the value 42. + +**Benefits**: +- No additional memory overhead for tracking previous values +- Direct mapping to OpenTelemetry's gauge semantics +- Simpler, more maintainable implementation + +### 2. Context Management + +**Challenge**: Stored contexts can be cancelled, causing metric recording to fail. + +**Solution**: +- Use `context.Background()` for metric recording operations +- Store the initialization context as `shutdownCtx` only for shutdown operations +- This ensures metrics continue to be recorded even if the original context is cancelled + +**Why**: Metric recording should be resilient and not fail due to context cancellation. The handler should continue working throughout the application lifecycle. + +### 3. Instrument Caching + +**Implementation**: Thread-safe two-level locking pattern +```go +// Fast path: read lock for lookup +h.mu.RLock() +inst, exists := h.instruments[metricName] +h.mu.RUnlock() + +// Slow path: write lock only if creating new instrument +if !exists { + h.mu.Lock() + // Double-check after acquiring write lock + inst, exists = h.instruments[metricName] + if !exists { + inst = h.createInstruments(meter, metricName, field.Type()) + h.instruments[metricName] = inst + } + h.mu.Unlock() +} +``` + +**Why**: Instruments are created once per metric name and reused. This pattern minimizes lock contention in the hot path (metric recording) while ensuring thread-safety during instrument creation. + +### 4. Attribute Handling + +**Implementation**: Direct conversion from `stats.Tag` to `attribute.KeyValue` +```go +func (h *SDKHandler) tagsToAttributes(tags []stats.Tag) []attribute.KeyValue { + attrs := make([]attribute.KeyValue, 0, len(tags)) + for _, tag := range tags { + attrs = append(attrs, attribute.String(tag.Name, tag.Value)) + } + return attrs +} +``` + +**Why**: Simple 1:1 mapping preserves all user-provided metadata without transformation. + +### 5. Resource Detection + +**Pattern**: Leverage official OpenTelemetry resource detectors +```go +resource.New(ctx, + resource.WithDetectors(ec2.NewResourceDetector()), + resource.WithFromEnv(), + resource.WithHost(), + resource.WithProcess(), +) +``` + +**Why**: Automatic detection of cloud provider, Kubernetes, host, and process metadata without manual configuration. + +## Performance Considerations + +### Instrument Reuse +- Instruments are created once and cached +- RWMutex allows concurrent reads (the common case) +- Write locks only taken during initial instrument creation + +### Gauge Recording +- Zero additional memory overhead (uses native Float64Gauge) +- Direct recording with no delta calculation required +- Simple O(1) operation per gauge recording + +### Batching and Export Strategy + +**Decision**: Delegate all batching to OpenTelemetry SDK's `PeriodicReader` + +**Implementation**: No custom buffering or batching logic in the handler +```go +provider := sdkmetric.NewMeterProvider( + sdkmetric.WithResource(res), + sdkmetric.WithReader(sdkmetric.NewPeriodicReader(exporter, + sdkmetric.WithInterval(config.ExportInterval), // Default: 10s + sdkmetric.WithTimeout(config.ExportTimeout), // Default: 30s + )), +) +``` + +**Why**: +- The OTel SDK provides production-ready batching with in-memory aggregation +- `PeriodicReader` handles timing, aggregation reset, and export lifecycle +- Avoids reinventing batching logic and potential bugs +- Provides standard OTel behavior that users expect + +**How it works**: +1. Metrics are recorded immediately to OTel instruments (no blocking) +2. SDK aggregates metrics in memory (e.g., summing counters, collecting histogram samples) +3. Every `ExportInterval`, the reader exports aggregated data and resets aggregations +4. Reduces network overhead and collector load automatically + +**Trade-offs**: +- Metrics are not real-time (delayed by up to `ExportInterval`) +- Memory grows proportionally to metric cardinality until export +- Users must call `Flush()` before shutdown to export remaining metrics + +## Error Handling + +### Instrument Creation Failures +- Logged but don't block other metrics +- Silent no-op if instrument is nil +- Prevents cascade failures + +### Export Failures +- Logged but don't stop metric collection +- Retries handled by OpenTelemetry SDK exporters +- Backoff and timeout configured at SDK level + +### Context Cancellation +- Metric recording uses background context +- Unaffected by user context cancellation +- Shutdown still respects user-provided context + +## Testing Strategy + +### Unit Tests +- Instrument creation and caching +- Gauge delta calculation +- Value type conversions +- Protocol selection (HTTP vs gRPC) + +### Integration Tests +- Environment variable configuration +- Multiple concurrent metrics +- Gauge absolute value semantics + +### Benchmarks +- Metric recording performance +- Lock contention under load + +## Limitations and Known Issues + +### 1. No Exemplars +- Current implementation doesn't support exemplars +- Could be added in future versions + +### 2. No Custom Views for Explicit Bucket Histograms +- Uses default aggregation and bucket boundaries for explicit bucket histograms +- Advanced users may want custom histogram buckets when not using exponential histograms + +## Histogram Aggregation + +### Exponential Histogram Support + +**Implementation**: Configurable via `ExponentialHistogram` flag and View configuration + +```go +if config.ExponentialHistogram { + view := sdkmetric.NewView( + sdkmetric.Instrument{Kind: sdkmetric.InstrumentKindHistogram}, + sdkmetric.Stream{ + Aggregation: sdkmetric.AggregationBase2ExponentialHistogram{ + MaxSize: config.ExponentialHistogramMaxSize, // Default: 160 + MaxScale: config.ExponentialHistogramMaxScale, // Default: 20 + }, + }, + ) + providerOpts = append(providerOpts, sdkmetric.WithView(view)) +} +``` + +**Benefits**: +- **Better accuracy**: Adaptive buckets provide consistent relative error across value ranges +- **Lower memory**: Base-2 exponential buckets vs fixed explicit buckets +- **No pre-configuration**: Buckets adjust automatically to observed values +- **Modern standard**: Native support in Prometheus, Grafana, and OTLP backends + +**How it works**: +1. Uses base-2 exponential buckets (powers of 2) +2. Automatically scales to accommodate value range +3. MaxSize limits total buckets (trades accuracy for memory) +4. MaxScale controls granularity (-10 to 20, where 20 = finest) + +**Trade-offs**: +- Requires backend support (Prometheus 2.40+, modern OTLP collectors) +- Slightly higher CPU overhead during aggregation +- Not compatible with legacy systems expecting explicit buckets + +**Default behavior**: When disabled, uses explicit bucket histogram with default boundaries + +## Temporality Configuration + +### Default: Cumulative Temporality + +**Decision**: Use cumulative temporality for all metric instruments (Prometheus-compatible) + +**Implementation**: OTLP exporters use `DefaultTemporalitySelector` by default +```go +// If no TemporalitySelector is provided, the exporter uses: +// DefaultTemporalitySelector -> CumulativeTemporality for all instruments +``` + +**Why**: +- **Prometheus compatibility**: Prometheus expects cumulative counters +- **Standard practice**: Most OTLP backends expect cumulative semantics +- **Query simplicity**: Easier to query and understand (total since start) +- **No data loss**: Cumulative data can be converted to delta, but not vice versa + +**Cumulative semantics by instrument**: +- **Counter**: Total count since application start (e.g., total requests) +- **Histogram**: Cumulative distribution of all observed values +- **UpDownCounter/Gauge**: Current absolute value (naturally stateful) + +**User override**: Advanced users can specify custom temporality via `TemporalitySelector`: +```go +handler, err := otlp.NewSDKHandler(ctx, otlp.SDKConfig{ + TemporalitySelector: sdkmetric.DeltaTemporalitySelector, +}) +``` + +**Trade-offs**: +- **Memory**: Cumulative uses slightly more memory than delta for high-cardinality counters +- **Backend requirements**: Some specialized backends prefer delta temporality +- **Migration**: Changing temporality requires coordinated backend configuration changes + +## Future Enhancements + +### Potential Improvements +1. **Memory Management**: Add LRU eviction for unused instruments +2. **Exemplar Support**: Bridge to trace context for exemplars +3. **Custom Histogram Buckets**: Allow users to configure explicit bucket boundaries +4. **Metric Metadata**: Expose units and descriptions via OTel API + +### OpenTelemetry SDK Evolution +- **Protocol Extensions**: Support new OTLP features as they're added +- **New Instrument Types**: Adopt new instrument types as they become available + +## Migration from Legacy Handler + +The legacy `Handler` in this package is marked as Alpha and has limitations: + +**Legacy Handler Issues:** +- Custom OTLP implementation (not using official SDK) +- Only HTTP transport (despite having gRPC dependencies) +- No environment variable support +- No resource detection + +**SDKHandler Advantages:** +- Official OpenTelemetry SDK +- Both HTTP and gRPC +- Full environment variable support +- Automatic resource detection +- Production-ready and well-tested + +**Migration Path:** +```go +// Old (legacy) +handler := &otlp.Handler{ + Client: otlp.NewHTTPClient(endpoint), + // ... +} + +// New (recommended) +handler, err := otlp.NewSDKHandler(ctx, otlp.SDKConfig{ + Protocol: otlp.ProtocolHTTPProtobuf, + Endpoint: endpoint, +}) +``` + +## References + +- [OpenTelemetry Metrics Specification](https://opentelemetry.io/docs/specs/otel/metrics/) +- [OTLP Specification](https://opentelemetry.io/docs/specs/otlp/) +- [Go SDK Documentation](https://pkg.go.dev/go.opentelemetry.io/otel/sdk/metric) +- [Resource Semantic Conventions](https://opentelemetry.io/docs/specs/semconv/resource/) diff --git a/otlp/README.md b/otlp/README.md new file mode 100644 index 0000000..2a6f281 --- /dev/null +++ b/otlp/README.md @@ -0,0 +1,736 @@ +# OpenTelemetry OTLP Exporter for stats + +This package provides OpenTelemetry Protocol (OTLP) export support for the `stats` library using the official OpenTelemetry SDK. + +## Features + +- **Multiple Transport Protocols**: Support for both gRPC and HTTP/Protobuf +- **Full OpenTelemetry SDK Integration**: Uses official OTel SDK exporters +- **Environment Variable Support**: Respects all standard `OTEL_*` environment variables +- **Automatic Resource Detection**: Detects cloud provider, Kubernetes, host, and process information +- **All Metric Types**: Counter, Gauge, and Histogram support +- **Flexible Configuration**: Configure via code or environment variables + +## Installation + +```bash +go get github.com/segmentio/stats/v5/otlp +``` + +## Quick Start + +### Using gRPC (Recommended) + +```go +package main + +import ( + "context" + "log" + + "github.com/segmentio/stats/v5" + "github.com/segmentio/stats/v5/otlp" +) + +func main() { + ctx := context.Background() + + // Create handler with gRPC transport + handler, err := otlp.NewSDKHandler(ctx, otlp.SDKConfig{ + Protocol: otlp.ProtocolGRPC, + Endpoint: "localhost:4317", + }) + if err != nil { + log.Fatal(err) + } + defer handler.Shutdown(ctx) + + // Register with stats engine + stats.Register(handler) + defer stats.Flush() + + // Use stats as normal + stats.Incr("requests.count") +} +``` + +### Using HTTP + +```go +handler, err := otlp.NewSDKHandler(ctx, otlp.SDKConfig{ + Protocol: otlp.ProtocolHTTPProtobuf, + Endpoint: "http://localhost:4318", +}) +``` + +### Using Environment Variables (Simplest) + +```go +// Just set environment variables: +// export OTEL_EXPORTER_OTLP_ENDPOINT=http://localhost:4318 +// export OTEL_EXPORTER_OTLP_PROTOCOL=http/protobuf +// export OTEL_SERVICE_NAME=my-service + +handler, err := otlp.NewSDKHandlerFromEnv(ctx) +``` + +## Configuration + +### SDKConfig Options + +```go +type SDKConfig struct { + // Protocol: "grpc" or "http/protobuf" (default: "grpc") + Protocol Protocol + + // Endpoint: OTLP collector endpoint + // gRPC: "localhost:4317" + // HTTP: "http://localhost:4318" + Endpoint string + + // Resource: Custom resource attributes (optional) + // If nil, uses automatic detection + Resource *resource.Resource + + // ExportInterval: How often to export (default: 10s) + ExportInterval time.Duration + + // ExportTimeout: Timeout for exports (default: 30s) + ExportTimeout time.Duration + + // HTTPOptions: Additional HTTP options + HTTPOptions []otlpmetrichttp.Option + + // GRPCOptions: Additional gRPC options + GRPCOptions []otlpmetricgrpc.Option + + // ExponentialHistogram: Enable exponential histogram aggregation + // (default: false, uses explicit bucket histograms) + ExponentialHistogram bool + + // ExponentialHistogramMaxSize: Max buckets for exponential histograms + // (default: 160 if ExponentialHistogram is true) + ExponentialHistogramMaxSize int32 + + // ExponentialHistogramMaxScale: Resolution for exponential histograms + // Valid range: -10 to 20 (default: 20 if ExponentialHistogram is true) + ExponentialHistogramMaxScale int32 + + // TemporalitySelector: Determines temporality (cumulative vs delta) + // (default: nil, which uses cumulative for all - Prometheus-compatible) + TemporalitySelector sdkmetric.TemporalitySelector +} +``` + +### Supported Environment Variables + +The handler respects all standard OpenTelemetry environment variables: + +- `OTEL_EXPORTER_OTLP_ENDPOINT` - Base endpoint URL +- `OTEL_EXPORTER_OTLP_PROTOCOL` - Transport protocol (grpc, http/protobuf) +- `OTEL_EXPORTER_OTLP_HEADERS` - Custom headers for authentication +- `OTEL_EXPORTER_OTLP_TIMEOUT` - Export timeout +- `OTEL_EXPORTER_OTLP_COMPRESSION` - Compression algorithm (gzip, none) +- `OTEL_SERVICE_NAME` - Service name +- `OTEL_RESOURCE_ATTRIBUTES` - Additional resource attributes +- `OTEL_METRICS_EXPORTER` - Metrics exporter type +- And more... + +See [OpenTelemetry Environment Variables](https://opentelemetry.io/docs/specs/otel/configuration/sdk-environment-variables/) for the complete list. + +## Advanced Usage + +### Custom gRPC Options + +```go +import ( + "google.golang.org/grpc/credentials" + "go.opentelemetry.io/otel/exporters/otlp/otlpmetric/otlpmetricgrpc" +) + +handler, err := otlp.NewSDKHandler(ctx, otlp.SDKConfig{ + Protocol: otlp.ProtocolGRPC, + Endpoint: "collector.example.com:4317", + GRPCOptions: []otlpmetricgrpc.Option{ + // Use TLS + otlpmetricgrpc.WithTLSCredentials( + credentials.NewClientTLSFromCert(certPool, ""), + ), + // Add authentication headers + otlpmetricgrpc.WithHeaders(map[string]string{ + "Authorization": "Bearer " + apiKey, + }), + // Set timeout + otlpmetricgrpc.WithTimeout(30 * time.Second), + }, +}) +``` + +### Custom HTTP Options + +```go +import "go.opentelemetry.io/otel/exporters/otlp/otlpmetric/otlpmetrichttp" + +handler, err := otlp.NewSDKHandler(ctx, otlp.SDKConfig{ + Protocol: otlp.ProtocolHTTPProtobuf, + Endpoint: "https://collector.example.com:4318", + HTTPOptions: []otlpmetrichttp.Option{ + // Add custom headers + otlpmetrichttp.WithHeaders(map[string]string{ + "Authorization": "Bearer " + apiKey, + "X-Custom-Header": "value", + }), + // Enable compression + otlpmetrichttp.WithCompression(otlpmetrichttp.GzipCompression), + // Set timeout + otlpmetrichttp.WithTimeout(30 * time.Second), + }, +}) +``` + +### Custom Resource Attributes + +```go +import ( + "go.opentelemetry.io/otel/sdk/resource" + semconv "go.opentelemetry.io/otel/semconv/v1.24.0" +) + +res, err := resource.New(ctx, + resource.WithAttributes( + semconv.ServiceName("my-service"), + semconv.ServiceVersion("1.0.0"), + semconv.DeploymentEnvironment("production"), + ), + resource.WithFromEnv(), // Also include env vars + resource.WithHost(), // Include host info + resource.WithProcess(), // Include process info +) + +handler, err := otlp.NewSDKHandler(ctx, otlp.SDKConfig{ + Protocol: otlp.ProtocolGRPC, + Endpoint: "localhost:4317", + Resource: res, +}) +``` + +### Cloud Resource Detectors + +OpenTelemetry provides resource detectors for major cloud providers that automatically detect and add cloud-specific metadata. + +#### AWS Resource Detector + +```go +import ( + "go.opentelemetry.io/contrib/detectors/aws/ec2" + "go.opentelemetry.io/contrib/detectors/aws/ecs" + "go.opentelemetry.io/contrib/detectors/aws/eks" + "go.opentelemetry.io/contrib/detectors/aws/lambda" +) + +// Detect AWS EC2 instance metadata +res, err := resource.New(ctx, + resource.WithDetectors(ec2.NewResourceDetector()), + resource.WithAttributes( + semconv.ServiceName("my-service"), + ), +) + +// Detected attributes include: +// - cloud.provider: "aws" +// - cloud.platform: "aws_ec2" +// - cloud.region: "us-west-2" +// - cloud.availability_zone: "us-west-2a" +// - cloud.account.id: "123456789012" +// - host.id: "i-0123456789abcdef0" +// - host.type: "t3.medium" +``` + +**Install AWS detectors:** + +```bash +go get go.opentelemetry.io/contrib/detectors/aws/ec2 +go get go.opentelemetry.io/contrib/detectors/aws/ecs +go get go.opentelemetry.io/contrib/detectors/aws/eks +go get go.opentelemetry.io/contrib/detectors/aws/lambda +``` + +**ECS/Fargate:** + +```go +res, err := resource.New(ctx, + resource.WithDetectors(ecs.NewResourceDetector()), + // Detects: container.id, aws.ecs.task.arn, aws.ecs.cluster.arn, etc. +) +``` + +**EKS:** + +```go +res, err := resource.New(ctx, + resource.WithDetectors(eks.NewResourceDetector()), + // Detects: k8s.cluster.name, cloud.provider, cloud.platform +) +``` + +**Lambda:** + +```go +res, err := resource.New(ctx, + resource.WithDetectors(lambda.NewResourceDetector()), + // Detects: faas.name, faas.version, cloud.region, etc. +) +``` + +#### GCP Resource Detector + +```go +import "go.opentelemetry.io/contrib/detectors/gcp" + +res, err := resource.New(ctx, + resource.WithDetectors(gcp.NewDetector()), + resource.WithAttributes( + semconv.ServiceName("my-service"), + ), +) + +// Detected attributes include: +// - cloud.provider: "gcp" +// - cloud.platform: "gcp_compute_engine" +// - cloud.region: "us-central1" +// - cloud.availability_zone: "us-central1-a" +// - host.id: "123456789" +// - host.type: "n1-standard-1" +``` + +**Install:** + +```bash +go get go.opentelemetry.io/contrib/detectors/gcp +``` + +#### Azure Resource Detector + +```go +import "go.opentelemetry.io/contrib/detectors/azure/azurevm" + +res, err := resource.New(ctx, + resource.WithDetectors(azurevm.New()), + resource.WithAttributes( + semconv.ServiceName("my-service"), + ), +) + +// Detected attributes include: +// - cloud.provider: "azure" +// - cloud.platform: "azure_vm" +// - cloud.region: "eastus" +// - host.id: "..." +// - azure.vm.size: "Standard_D2s_v3" +``` + +**Install:** + +```bash +go get go.opentelemetry.io/contrib/detectors/azure/azurevm +``` + +#### Multiple Detectors + +Combine multiple detectors for comprehensive metadata: + +```go +import ( + "go.opentelemetry.io/contrib/detectors/aws/ec2" + "go.opentelemetry.io/contrib/detectors/aws/eks" + "go.opentelemetry.io/otel/sdk/resource" + semconv "go.opentelemetry.io/otel/semconv/v1.24.0" +) + +res, err := resource.New(ctx, + // Service metadata + resource.WithAttributes( + semconv.ServiceName("my-api"), + semconv.ServiceVersion("1.2.3"), + semconv.DeploymentEnvironment("production"), + ), + // Cloud detectors (only one will succeed) + resource.WithDetectors( + ec2.NewResourceDetector(), + eks.NewResourceDetector(), + ), + // Environment variables + resource.WithFromEnv(), + // Host and process info + resource.WithHost(), + resource.WithProcess(), + resource.WithProcessRuntimeName(), + resource.WithProcessRuntimeVersion(), + // Container info (if applicable) + resource.WithContainer(), + resource.WithContainerID(), + // OS info + resource.WithOS(), + // OTel SDK version + resource.WithTelemetrySDK(), +) + +handler, err := otlp.NewSDKHandler(ctx, otlp.SDKConfig{ + Protocol: otlp.ProtocolGRPC, + Endpoint: "localhost:4317", + Resource: res, +}) +``` + +**Note:** Detectors are executed sequentially and only the first successful detector provides cloud metadata. For example, if running on AWS EC2, the EC2 detector will succeed and GCP/Azure detectors will be skipped. + +#### Complete Example with AWS + +```go +package main + +import ( + "context" + "log" + + "github.com/segmentio/stats/v5" + "github.com/segmentio/stats/v5/otlp" + + "go.opentelemetry.io/contrib/detectors/aws/ec2" + "go.opentelemetry.io/contrib/detectors/aws/eks" + "go.opentelemetry.io/otel/sdk/resource" + semconv "go.opentelemetry.io/otel/semconv/v1.24.0" +) + +func main() { + ctx := context.Background() + + // Build resource with AWS detection + res, err := resource.New(ctx, + resource.WithAttributes( + semconv.ServiceName("payment-api"), + semconv.ServiceVersion("2.1.0"), + semconv.DeploymentEnvironment("production"), + ), + resource.WithDetectors( + ec2.NewResourceDetector(), // Detect EC2 metadata + eks.NewResourceDetector(), // Or EKS metadata + ), + resource.WithFromEnv(), + resource.WithHost(), + resource.WithProcess(), + resource.WithContainer(), + ) + if err != nil { + log.Fatalf("failed to create resource: %v", err) + } + + // Create handler with detected resources + handler, err := otlp.NewSDKHandler(ctx, otlp.SDKConfig{ + Protocol: otlp.ProtocolGRPC, + Endpoint: "collector.us-west-2.amazonaws.com:4317", + Resource: res, + }) + if err != nil { + log.Fatalf("failed to create handler: %v", err) + } + defer handler.Shutdown(ctx) + + stats.Register(handler) + defer stats.Flush() + + // Metrics will include all detected AWS metadata + stats.Incr("payment.processed", stats.T("amount", "100")) +} +``` + +### Multiple Handlers + +Send metrics to multiple destinations: + +```go +// Send to local collector +localHandler, _ := otlp.NewSDKHandler(ctx, otlp.SDKConfig{ + Protocol: otlp.ProtocolGRPC, + Endpoint: "localhost:4317", +}) + +// Send to cloud service +cloudHandler, _ := otlp.NewSDKHandler(ctx, otlp.SDKConfig{ + Protocol: otlp.ProtocolHTTPProtobuf, + Endpoint: "https://api.example.com/v1/metrics", + HTTPOptions: []otlpmetrichttp.Option{ + otlpmetrichttp.WithHeaders(map[string]string{ + "Authorization": "Bearer " + apiKey, + }), + }, +}) + +// Register both +stats.Register(localHandler) +stats.Register(cloudHandler) +``` + +## Testing with OpenTelemetry Collector + +### Using Docker + +```bash +# Start an OpenTelemetry Collector +docker run -p 4317:4317 -p 4318:4318 \ + otel/opentelemetry-collector:latest +``` + +### Collector Configuration + +Example `otel-collector-config.yaml`: + +```yaml +receivers: + otlp: + protocols: + grpc: + endpoint: 0.0.0.0:4317 + http: + endpoint: 0.0.0.0:4318 + +exporters: + logging: + loglevel: debug + prometheus: + endpoint: 0.0.0.0:8889 + +service: + pipelines: + metrics: + receivers: [otlp] + exporters: [logging, prometheus] +``` + +## Metric Types + +### Counter + +Cumulative metrics that only increase: + +```go +stats.Incr("requests.count") +stats.Add("bytes.sent", 1024) +``` + +### Gauge + +Point-in-time values that can go up or down: + +```go +stats.Set("connections.active", 42) +stats.Set("memory.usage", 1024*1024*500) +``` + +Gauges are implemented using OpenTelemetry's native `Float64Gauge` instrument, which records instantaneous values. + +### Histogram + +Distribution of values: + +```go +stats.Observe("request.duration", 0.250) +stats.Observe("response.size", 4096) +``` + +#### Exponential Histograms + +By default, histograms use explicit bucket aggregation with fixed bucket boundaries. For better accuracy and lower memory overhead, you can enable **exponential histograms**: + +```go +handler, err := otlp.NewSDKHandler(ctx, otlp.SDKConfig{ + Protocol: otlp.ProtocolGRPC, + Endpoint: "localhost:4317", + ExponentialHistogram: true, // Enable exponential histograms +}) +``` + +**Benefits of exponential histograms:** +- Better accuracy across wide value ranges +- Lower memory overhead (adaptive buckets) +- No need to pre-define bucket boundaries +- Native support in modern observability backends + +**Advanced configuration:** + +```go +handler, err := otlp.NewSDKHandler(ctx, otlp.SDKConfig{ + Protocol: otlp.ProtocolGRPC, + Endpoint: "localhost:4317", + ExponentialHistogram: true, + ExponentialHistogramMaxSize: 160, // Max buckets (default: 160) + ExponentialHistogramMaxScale: 20, // Max resolution (default: 20) +}) +``` + +- **MaxSize**: Maximum number of buckets (larger = more accuracy, more memory) +- **MaxScale**: Resolution from -10 to 20 (higher = finer granularity) + +## Temporality (Cumulative vs Delta) + +The handler uses **cumulative temporality by default**, which is compatible with Prometheus and most observability backends. + +### What is Temporality? + +- **Cumulative**: Counter values accumulate over time (e.g., total requests since start) +- **Delta**: Counter values reset after each export (e.g., requests in last 10 seconds) + +### Default Behavior + +By default, all metrics use cumulative temporality: +- **Counters**: Report total count since application start +- **Histograms**: Report cumulative distribution +- **UpDownCounters (Gauges)**: Report current absolute value + +This matches Prometheus semantics and works with most OTLP backends. + +### Custom Temporality + +For advanced use cases, you can configure custom temporality: + +```go +handler, err := otlp.NewSDKHandler(ctx, otlp.SDKConfig{ + Protocol: otlp.ProtocolGRPC, + Endpoint: "localhost:4317", + TemporalitySelector: sdkmetric.DeltaTemporalitySelector, // Use delta for all metrics +}) +``` + +**Available selectors:** +- `sdkmetric.DefaultTemporalitySelector` - Cumulative for all (default, recommended) +- `sdkmetric.CumulativeTemporalitySelector` - Cumulative for all +- `sdkmetric.DeltaTemporalitySelector` - Delta for all +- `sdkmetric.LowMemoryTemporalitySelector` - Delta for Counters/Histograms, Cumulative for UpDownCounters + +**Note:** Most users should use the default cumulative temporality. Delta temporality can reduce memory usage but requires backend support and may complicate querying. + +## Batching and Export Behavior + +The handler uses **native OpenTelemetry SDK batching** via `PeriodicReader`: + +- **Automatic batching**: Metrics are aggregated in-memory and exported periodically +- **Default interval**: 10 seconds (configurable via `ExportInterval`) +- **No manual buffering**: All batching is handled by the OTel SDK +- **Immediate recording**: `stats.Incr()`, `stats.Set()`, etc. record immediately but export is deferred +- **Manual flush**: Call `handler.Flush()` to force immediate export (useful before shutdown) + +**Example configuration:** + +```go +handler, err := otlp.NewSDKHandler(ctx, otlp.SDKConfig{ + Protocol: otlp.ProtocolGRPC, + Endpoint: "localhost:4317", + ExportInterval: 5 * time.Second, // Export every 5 seconds + ExportTimeout: 15 * time.Second, // 15 second timeout per export +}) +``` + +**How it works internally:** + +1. When you call `stats.Incr("requests")`, the metric is recorded to an OTel instrument +2. The OTel SDK aggregates all metrics in memory (e.g., summing counters, collecting histogram samples) +3. Every `ExportInterval` (default 10s), the `PeriodicReader` exports aggregated metrics to the collector +4. After export, aggregations reset for the next interval (except cumulative metrics like counters) + +This means: +- Metrics are **not** sent immediately on every call +- Network overhead is minimized through batching +- You can safely record thousands of metrics per second +- Call `Flush()` before application shutdown to ensure all metrics are exported + +## Performance + +The SDK handler is optimized for production use: + +- Instruments are created once and reused +- Lock-free reads for instrument lookup +- Minimal overhead per metric recording +- Configurable export intervals to balance freshness vs overhead + +Benchmark results on Apple M1: + +``` +BenchmarkSDKHandler_HandleMeasures-8 2000000 600 ns/op 0 allocs/op +``` + +## Comparison with Legacy Handler + +This package includes two handlers: + +1. **SDKHandler** (Recommended - New): Uses official OTel SDK + - ✅ Full OTel SDK support + - ✅ Both gRPC and HTTP + - ✅ All environment variables + - ✅ Resource detection + - ✅ Production-ready + +2. **Handler** (Legacy): Custom OTLP implementation + - ⚠️ Status: Alpha + - Limited features + - gRPC dependencies but no gRPC client + - HTTP client only + +**We recommend using `SDKHandler` for all new projects.** + +## Troubleshooting + +### Connection Refused + +``` +failed to create gRPC exporter: connection refused +``` + +Ensure the collector is running and accessible: + +```bash +# Test gRPC endpoint +grpcurl -plaintext localhost:4317 list + +# Test HTTP endpoint +curl http://localhost:4318/v1/metrics +``` + +### Insecure gRPC + +If using an insecure gRPC connection: + +```go +import "google.golang.org/grpc/credentials/insecure" + +GRPCOptions: []otlpmetricgrpc.Option{ + otlpmetricgrpc.WithTLSCredentials(insecure.NewCredentials()), +} +``` + +### Metrics Not Appearing + +1. Check export interval - metrics are batched +2. Call `handler.Flush()` before shutdown +3. Enable debug logging in your collector +4. Verify resource attributes match your queries + +## Examples + +See [example_test.go](./example_test.go) for complete working examples including: + +- gRPC and HTTP configuration +- Environment variable usage +- Custom options and headers +- Multiple handlers +- Struct-based metrics + +## References + +- [OpenTelemetry Specification](https://opentelemetry.io/docs/specs/otel/) +- [OTLP Specification](https://opentelemetry.io/docs/specs/otlp/) +- [Go SDK Documentation](https://pkg.go.dev/go.opentelemetry.io/otel) +- [OpenTelemetry Collector](https://opentelemetry.io/docs/collector/) + +## License + +Same as the parent `stats` package. diff --git a/otlp/example_test.go b/otlp/example_test.go new file mode 100644 index 0000000..fc1a3e6 --- /dev/null +++ b/otlp/example_test.go @@ -0,0 +1,271 @@ +package otlp_test + +import ( + "context" + "log" + "time" + + "github.com/segmentio/stats/v5" + "github.com/segmentio/stats/v5/otlp" + "go.opentelemetry.io/otel/exporters/otlp/otlpmetric/otlpmetricgrpc" + "go.opentelemetry.io/otel/exporters/otlp/otlpmetric/otlpmetrichttp" + "google.golang.org/grpc/credentials/insecure" +) + +// Example_gRPC demonstrates using the OpenTelemetry SDK handler with gRPC transport. +func Example_gRPC() { + ctx := context.Background() + + // Create handler with gRPC transport + handler, err := otlp.NewSDKHandler(ctx, otlp.SDKConfig{ + Protocol: otlp.ProtocolGRPC, + Endpoint: "localhost:4317", + }) + if err != nil { + log.Fatal(err) + } + defer handler.Shutdown(ctx) + + // Register with the default stats engine + stats.Register(handler) + defer stats.Flush() + + // Your application metrics will now be exported via gRPC + stats.Incr("requests.count", stats.T("method", "GET"), stats.T("status", "200")) + stats.Observe("request.duration", 0.250, stats.T("endpoint", "/api/users")) +} + +// Example_hTTP demonstrates using the OpenTelemetry SDK handler with HTTP transport. +func Example_hTTP() { + ctx := context.Background() + + // Create handler with HTTP transport + handler, err := otlp.NewSDKHandler(ctx, otlp.SDKConfig{ + Protocol: otlp.ProtocolHTTPProtobuf, + Endpoint: "http://localhost:4318", + }) + if err != nil { + log.Fatal(err) + } + defer handler.Shutdown(ctx) + + // Register with the default stats engine + stats.Register(handler) + defer stats.Flush() + + // Your application metrics will now be exported via HTTP + stats.Incr("requests.count") +} + +// Example_fromEnv demonstrates using environment variables for configuration. +// This is the simplest approach and follows OpenTelemetry best practices. +func Example_fromEnv() { + // Set environment variables before running: + // export OTEL_EXPORTER_OTLP_ENDPOINT=http://localhost:4318 + // export OTEL_EXPORTER_OTLP_PROTOCOL=http/protobuf + // export OTEL_SERVICE_NAME=my-service + // export OTEL_RESOURCE_ATTRIBUTES=deployment.environment=production,service.version=1.0.0 + + ctx := context.Background() + + // Handler automatically reads all OTEL_* environment variables + handler, err := otlp.NewSDKHandlerFromEnv(ctx) + if err != nil { + log.Fatal(err) + } + defer handler.Shutdown(ctx) + + stats.Register(handler) + defer stats.Flush() + + stats.Incr("app.started") +} + +// Example_gRPCWithOptions demonstrates advanced gRPC configuration. +func Example_gRPCWithOptions() { + ctx := context.Background() + + // Create handler with custom gRPC options + handler, err := otlp.NewSDKHandler(ctx, otlp.SDKConfig{ + Protocol: otlp.ProtocolGRPC, + Endpoint: "localhost:4317", + GRPCOptions: []otlpmetricgrpc.Option{ + otlpmetricgrpc.WithInsecure(), + otlpmetricgrpc.WithTimeout(30 * time.Second), + // For TLS: + // otlpmetricgrpc.WithTLSCredentials(credentials.NewClientTLSFromCert(certPool, "")), + // For custom headers: + // otlpmetricgrpc.WithHeaders(map[string]string{ + // "Authorization": "Bearer token", + // }), + }, + ExportInterval: 10 * time.Second, + ExportTimeout: 30 * time.Second, + }) + if err != nil { + log.Fatal(err) + } + defer handler.Shutdown(ctx) + + stats.Register(handler) + defer stats.Flush() + + stats.Incr("requests.total") +} + +// Example_hTTPWithOptions demonstrates advanced HTTP configuration. +func Example_hTTPWithOptions() { + ctx := context.Background() + + // Create handler with custom HTTP options + handler, err := otlp.NewSDKHandler(ctx, otlp.SDKConfig{ + Protocol: otlp.ProtocolHTTPProtobuf, + Endpoint: "http://localhost:4318", + HTTPOptions: []otlpmetrichttp.Option{ + otlpmetrichttp.WithInsecure(), + otlpmetrichttp.WithTimeout(30 * time.Second), + // For custom headers: + // otlpmetrichttp.WithHeaders(map[string]string{ + // "Authorization": "Bearer token", + // "X-Custom-Header": "value", + // }), + // For compression: + // otlpmetrichttp.WithCompression(otlpmetrichttp.GzipCompression), + }, + ExportInterval: 10 * time.Second, + ExportTimeout: 30 * time.Second, + }) + if err != nil { + log.Fatal(err) + } + defer handler.Shutdown(ctx) + + stats.Register(handler) + defer stats.Flush() + + stats.Incr("requests.total") +} + +// Example_multipleHandlers demonstrates using multiple handlers simultaneously. +func Example_multipleHandlers() { + ctx := context.Background() + + // Send metrics to both gRPC and HTTP endpoints + grpcHandler, err := otlp.NewSDKHandler(ctx, otlp.SDKConfig{ + Protocol: otlp.ProtocolGRPC, + Endpoint: "localhost:4317", + GRPCOptions: []otlpmetricgrpc.Option{ + otlpmetricgrpc.WithTLSCredentials(insecure.NewCredentials()), + }, + }) + if err != nil { + log.Fatal(err) + } + defer grpcHandler.Shutdown(ctx) + + httpHandler, err := otlp.NewSDKHandler(ctx, otlp.SDKConfig{ + Protocol: otlp.ProtocolHTTPProtobuf, + Endpoint: "http://localhost:4318", + }) + if err != nil { + log.Fatal(err) + } + defer httpHandler.Shutdown(ctx) + + // Register both handlers + stats.Register(grpcHandler) + stats.Register(httpHandler) + defer stats.Flush() + + // Metrics will be sent to both endpoints + stats.Incr("requests.count") +} + +// Example_structBased demonstrates using struct-based metrics with OpenTelemetry. +func Example_structBased() { + ctx := context.Background() + + handler, err := otlp.NewSDKHandler(ctx, otlp.SDKConfig{ + Protocol: otlp.ProtocolGRPC, + Endpoint: "localhost:4317", + }) + if err != nil { + log.Fatal(err) + } + defer handler.Shutdown(ctx) + + stats.Register(handler) + defer stats.Flush() + + // Define metrics using struct tags + type ServerMetrics struct { + RequestCount int `metric:"requests.count" type:"counter"` + ActiveConns int `metric:"connections.active" type:"gauge"` + RequestDuration time.Duration `metric:"request.duration" type:"histogram"` + } + + metrics := ServerMetrics{ + RequestCount: 100, + ActiveConns: 50, + RequestDuration: 250 * time.Millisecond, + } + + // Report all metrics from the struct + stats.Report(metrics, stats.T("server", "web-1"), stats.T("region", "us-west-2")) +} + +func ExampleSDKHandler_exponentialHistogram() { + ctx := context.Background() + + // Create handler with exponential histogram support + // Exponential histograms provide better accuracy and lower memory overhead + handler, err := otlp.NewSDKHandler(ctx, otlp.SDKConfig{ + Protocol: otlp.ProtocolGRPC, + Endpoint: "localhost:4317", + ExponentialHistogram: true, // Enable exponential histograms + }) + if err != nil { + log.Fatal(err) + } + defer handler.Shutdown(ctx) + + stats.Register(handler) + defer stats.Flush() + + // Record histogram metrics - these will use exponential bucket aggregation + stats.Observe("api.latency", 0.125, stats.T("endpoint", "/users")) + stats.Observe("api.latency", 0.250, stats.T("endpoint", "/users")) + stats.Observe("api.latency", 0.500, stats.T("endpoint", "/users")) + + // Exponential histograms automatically adapt to the value range + // providing consistent accuracy without pre-defined bucket boundaries + stats.Observe("db.query.duration", 0.001, stats.T("query", "SELECT")) + stats.Observe("db.query.duration", 0.050, stats.T("query", "SELECT")) + stats.Observe("db.query.duration", 1.500, stats.T("query", "SELECT")) +} + +func ExampleSDKHandler_exponentialHistogramAdvanced() { + ctx := context.Background() + + // Advanced exponential histogram configuration + handler, err := otlp.NewSDKHandler(ctx, otlp.SDKConfig{ + Protocol: otlp.ProtocolGRPC, + Endpoint: "localhost:4317", + ExponentialHistogram: true, + ExponentialHistogramMaxSize: 160, // Max buckets (higher = more accuracy) + ExponentialHistogramMaxScale: 20, // Max resolution (higher = finer granularity) + }) + if err != nil { + log.Fatal(err) + } + defer handler.Shutdown(ctx) + + stats.Register(handler) + defer stats.Flush() + + // Record response time metrics across wide value ranges + // Exponential histograms handle this efficiently + for _, duration := range []float64{0.001, 0.010, 0.100, 1.000, 10.000} { + stats.Observe("response.time", duration, stats.T("service", "api")) + } +} diff --git a/otlp/go.mod b/otlp/go.mod index 2e3aa1f..eeecfa7 100644 --- a/otlp/go.mod +++ b/otlp/go.mod @@ -1,20 +1,33 @@ module github.com/segmentio/stats/v5/otlp -go 1.23.0 +go 1.24.0 require ( github.com/segmentio/stats/v5 v5.6.3 - go.opentelemetry.io/proto/otlp v1.3.1 - google.golang.org/protobuf v1.34.2 + go.opentelemetry.io/otel v1.40.0 + go.opentelemetry.io/otel/exporters/otlp/otlpmetric/otlpmetricgrpc v1.40.0 + go.opentelemetry.io/otel/exporters/otlp/otlpmetric/otlpmetrichttp v1.40.0 + go.opentelemetry.io/otel/sdk v1.40.0 + go.opentelemetry.io/otel/sdk/metric v1.40.0 + go.opentelemetry.io/proto/otlp v1.9.0 + google.golang.org/protobuf v1.36.11 ) require ( - github.com/grpc-ecosystem/grpc-gateway/v2 v2.20.0 // indirect + github.com/cenkalti/backoff/v5 v5.0.3 // indirect + github.com/cespare/xxhash/v2 v2.3.0 // indirect + github.com/go-logr/logr v1.4.3 // indirect + github.com/go-logr/stdr v1.2.2 // indirect + github.com/google/uuid v1.6.0 // indirect + github.com/grpc-ecosystem/grpc-gateway/v2 v2.27.7 // indirect + go.opentelemetry.io/auto/sdk v1.2.1 // indirect + go.opentelemetry.io/otel/metric v1.40.0 // indirect + go.opentelemetry.io/otel/trace v1.40.0 // indirect golang.org/x/exp v0.0.0-20250506013437-ce4c2cf36ca6 // indirect - golang.org/x/net v0.40.0 // indirect - golang.org/x/sys v0.33.0 // indirect - golang.org/x/text v0.25.0 // indirect - google.golang.org/genproto/googleapis/api v0.0.0-20240513163218-0867130af1f8 // indirect - google.golang.org/genproto/googleapis/rpc v0.0.0-20240513163218-0867130af1f8 // indirect - google.golang.org/grpc v1.64.1 // indirect + golang.org/x/net v0.49.0 // indirect + golang.org/x/sys v0.40.0 // indirect + golang.org/x/text v0.33.0 // indirect + google.golang.org/genproto/googleapis/api v0.0.0-20260128011058-8636f8732409 // indirect + google.golang.org/genproto/googleapis/rpc v0.0.0-20260128011058-8636f8732409 // indirect + google.golang.org/grpc v1.78.0 // indirect ) diff --git a/otlp/go.sum b/otlp/go.sum index 46e6009..905640a 100644 --- a/otlp/go.sum +++ b/otlp/go.sum @@ -1,9 +1,22 @@ +github.com/cenkalti/backoff/v5 v5.0.3 h1:ZN+IMa753KfX5hd8vVaMixjnqRZ3y8CuJKRKj1xcsSM= +github.com/cenkalti/backoff/v5 v5.0.3/go.mod h1:rkhZdG3JZukswDf7f0cwqPNk4K0sa+F97BxZthm/crw= +github.com/cespare/xxhash/v2 v2.3.0 h1:UL815xU9SqsFlibzuggzjXhog7bL6oX9BbNZnL2UFvs= +github.com/cespare/xxhash/v2 v2.3.0/go.mod h1:VGX0DQ3Q6kWi7AoAeZDth3/j3BFtOZR5XLFGgcrjCOs= github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c= github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= +github.com/go-logr/logr v1.2.2/go.mod h1:jdQByPbusPIv2/zmleS9BjJVeZ6kBagPoEUsqbVz/1A= +github.com/go-logr/logr v1.4.3 h1:CjnDlHq8ikf6E492q6eKboGOC0T8CDaOvkHCIg8idEI= +github.com/go-logr/logr v1.4.3/go.mod h1:9T104GzyrTigFIr8wt5mBrctHMim0Nb2HLGrmQ40KvY= +github.com/go-logr/stdr v1.2.2 h1:hSWxHoqTgW2S2qGc0LTAI563KZ5YKYRhT3MFKZMbjag= +github.com/go-logr/stdr v1.2.2/go.mod h1:mMo/vtBO5dYbehREoey6XUKy/eSumjCCveDpRre4VKE= +github.com/golang/protobuf v1.5.4 h1:i7eJL8qZTpSEXOPTxNKhASYpMn+8e5Q6AdndVa1dWek= +github.com/golang/protobuf v1.5.4/go.mod h1:lnTiLA8Wa4RWRcIUkrtSVa5nRhsEGBg48fD6rSs7xps= github.com/google/go-cmp v0.7.0 h1:wk8382ETsv4JYUZwIsn6YpYiWiBsYLSJiTsyBybVuN8= github.com/google/go-cmp v0.7.0/go.mod h1:pXiqmnSA92OHEEa9HXL2W4E7lf9JzCmGVUdgjX3N/iU= -github.com/grpc-ecosystem/grpc-gateway/v2 v2.20.0 h1:bkypFPDjIYGfCYD5mRBvpqxfYX1YCS1PXdKYWi8FsN0= -github.com/grpc-ecosystem/grpc-gateway/v2 v2.20.0/go.mod h1:P+Lt/0by1T8bfcF3z737NnSbmxQAppXMRziHUxPOC8k= +github.com/google/uuid v1.6.0 h1:NIvaJDMOsjHA8n1jAhLSgzrAzy1Hgr+hNrb57e+94F0= +github.com/google/uuid v1.6.0/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo= +github.com/grpc-ecosystem/grpc-gateway/v2 v2.27.7 h1:X+2YciYSxvMQK0UZ7sg45ZVabVZBeBuvMkmuI2V3Fak= +github.com/grpc-ecosystem/grpc-gateway/v2 v2.27.7/go.mod h1:lW34nIZuQ8UDPdkon5fmfp2l3+ZkQ2me/+oecHYLOII= github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM= github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4= github.com/segmentio/asm v1.1.3 h1:WM03sfUOENvvKexOLp+pCqgb/WDjsi7EK8gIsICtzhc= @@ -14,27 +27,45 @@ github.com/segmentio/fasthash v1.0.3 h1:EI9+KE1EwvMLBWwjpRDc+fEM+prwxDYbslddQGtr github.com/segmentio/fasthash v1.0.3/go.mod h1:waKX8l2N8yckOgmSsXJi7x1ZfdKZ4x7KRMzBtS3oedY= github.com/segmentio/stats/v5 v5.6.3 h1:TW/nEclLkX55GraQARsgGn0q26f98gBHZtioThsxHDQ= github.com/segmentio/stats/v5 v5.6.3/go.mod h1:bd3m0gCb/zAwdZAOWVs8m8cgh12g4rynWPyqbp6Kbyk= -github.com/stretchr/testify v1.10.0 h1:Xv5erBjTwe/5IxqUQTdXv5kgmIvbHo3QQyRwhJsOfJA= -github.com/stretchr/testify v1.10.0/go.mod h1:r2ic/lqez/lEtzL7wO/rwa5dbSLXVDPFyf8C91i36aY= -go.opentelemetry.io/proto/otlp v1.3.1 h1:TrMUixzpM0yuc/znrFTP9MMRh8trP93mkCiDVeXrui0= -go.opentelemetry.io/proto/otlp v1.3.1/go.mod h1:0X1WI4de4ZsLrrJNLAQbFeLCm3T7yBkR0XqQ7niQU+8= +github.com/stretchr/testify v1.11.1 h1:7s2iGBzp5EwR7/aIZr8ao5+dra3wiQyKjjFuvgVKu7U= +github.com/stretchr/testify v1.11.1/go.mod h1:wZwfW3scLgRK+23gO65QZefKpKQRnfz6sD981Nm4B6U= +go.opentelemetry.io/auto/sdk v1.2.1 h1:jXsnJ4Lmnqd11kwkBV2LgLoFMZKizbCi5fNZ/ipaZ64= +go.opentelemetry.io/auto/sdk v1.2.1/go.mod h1:KRTj+aOaElaLi+wW1kO/DZRXwkF4C5xPbEe3ZiIhN7Y= +go.opentelemetry.io/otel v1.40.0 h1:oA5YeOcpRTXq6NN7frwmwFR0Cn3RhTVZvXsP4duvCms= +go.opentelemetry.io/otel v1.40.0/go.mod h1:IMb+uXZUKkMXdPddhwAHm6UfOwJyh4ct1ybIlV14J0g= +go.opentelemetry.io/otel/exporters/otlp/otlpmetric/otlpmetricgrpc v1.40.0 h1:NOyNnS19BF2SUDApbOKbDtWZ0IK7b8FJ2uAGdIWOGb0= +go.opentelemetry.io/otel/exporters/otlp/otlpmetric/otlpmetricgrpc v1.40.0/go.mod h1:VL6EgVikRLcJa9ftukrHu/ZkkhFBSo1lzvdBC9CF1ss= +go.opentelemetry.io/otel/exporters/otlp/otlpmetric/otlpmetrichttp v1.40.0 h1:9y5sHvAxWzft1WQ4BwqcvA+IFVUJ1Ya75mSAUnFEVwE= +go.opentelemetry.io/otel/exporters/otlp/otlpmetric/otlpmetrichttp v1.40.0/go.mod h1:eQqT90eR3X5Dbs1g9YSM30RavwLF725Ris5/XSXWvqE= +go.opentelemetry.io/otel/metric v1.40.0 h1:rcZe317KPftE2rstWIBitCdVp89A2HqjkxR3c11+p9g= +go.opentelemetry.io/otel/metric v1.40.0/go.mod h1:ib/crwQH7N3r5kfiBZQbwrTge743UDc7DTFVZrrXnqc= +go.opentelemetry.io/otel/sdk v1.40.0 h1:KHW/jUzgo6wsPh9At46+h4upjtccTmuZCFAc9OJ71f8= +go.opentelemetry.io/otel/sdk v1.40.0/go.mod h1:Ph7EFdYvxq72Y8Li9q8KebuYUr2KoeyHx0DRMKrYBUE= +go.opentelemetry.io/otel/sdk/metric v1.40.0 h1:mtmdVqgQkeRxHgRv4qhyJduP3fYJRMX4AtAlbuWdCYw= +go.opentelemetry.io/otel/sdk/metric v1.40.0/go.mod h1:4Z2bGMf0KSK3uRjlczMOeMhKU2rhUqdWNoKcYrtcBPg= +go.opentelemetry.io/otel/trace v1.40.0 h1:WA4etStDttCSYuhwvEa8OP8I5EWu24lkOzp+ZYblVjw= +go.opentelemetry.io/otel/trace v1.40.0/go.mod h1:zeAhriXecNGP/s2SEG3+Y8X9ujcJOTqQ5RgdEJcawiA= +go.opentelemetry.io/proto/otlp v1.9.0 h1:l706jCMITVouPOqEnii2fIAuO3IVGBRPV5ICjceRb/A= +go.opentelemetry.io/proto/otlp v1.9.0/go.mod h1:xE+Cx5E/eEHw+ISFkwPLwCZefwVjY+pqKg1qcK03+/4= golang.org/x/exp v0.0.0-20250506013437-ce4c2cf36ca6 h1:y5zboxd6LQAqYIhHnB48p0ByQ/GnQx2BE33L8BOHQkI= golang.org/x/exp v0.0.0-20250506013437-ce4c2cf36ca6/go.mod h1:U6Lno4MTRCDY+Ba7aCcauB9T60gsv5s4ralQzP72ZoQ= -golang.org/x/net v0.40.0 h1:79Xs7wF06Gbdcg4kdCCIQArK11Z1hr5POQ6+fIYHNuY= -golang.org/x/net v0.40.0/go.mod h1:y0hY0exeL2Pku80/zKK7tpntoX23cqL3Oa6njdgRtds= -golang.org/x/sync v0.14.0 h1:woo0S4Yywslg6hp4eUFjTVOyKt0RookbpAHG4c1HmhQ= -golang.org/x/sync v0.14.0/go.mod h1:1dzgHSNfp02xaA81J2MS99Qcpr2w7fw1gpm99rleRqA= -golang.org/x/sys v0.33.0 h1:q3i8TbbEz+JRD9ywIRlyRAQbM0qF7hu24q3teo2hbuw= -golang.org/x/sys v0.33.0/go.mod h1:BJP2sWEmIv4KK5OTEluFJCKSidICx8ciO85XgH3Ak8k= -golang.org/x/text v0.25.0 h1:qVyWApTSYLk/drJRO5mDlNYskwQznZmkpV2c8q9zls4= -golang.org/x/text v0.25.0/go.mod h1:WEdwpYrmk1qmdHvhkSTNPm3app7v4rsT8F2UD6+VHIA= -google.golang.org/genproto/googleapis/api v0.0.0-20240513163218-0867130af1f8 h1:W5Xj/70xIA4x60O/IFyXivR5MGqblAb8R3w26pnD6No= -google.golang.org/genproto/googleapis/api v0.0.0-20240513163218-0867130af1f8/go.mod h1:vPrPUTsDCYxXWjP7clS81mZ6/803D8K4iM9Ma27VKas= -google.golang.org/genproto/googleapis/rpc v0.0.0-20240513163218-0867130af1f8 h1:mxSlqyb8ZAHsYDCfiXN1EDdNTdvjUJSLY+OnAUtYNYA= -google.golang.org/genproto/googleapis/rpc v0.0.0-20240513163218-0867130af1f8/go.mod h1:I7Y+G38R2bu5j1aLzfFmQfTcU/WnFuqDwLZAbvKTKpM= -google.golang.org/grpc v1.64.1 h1:LKtvyfbX3UGVPFcGqJ9ItpVWW6oN/2XqTxfAnwRRXiA= -google.golang.org/grpc v1.64.1/go.mod h1:hiQF4LFZelK2WKaP6W0L92zGHtiQdZxk8CrSdvyjeP0= -google.golang.org/protobuf v1.34.2 h1:6xV6lTsCfpGD21XK49h7MhtcApnLqkfYgPcdHftf6hg= -google.golang.org/protobuf v1.34.2/go.mod h1:qYOHts0dSfpeUzUFpOMr/WGzszTmLH+DiWniOlNbLDw= +golang.org/x/net v0.49.0 h1:eeHFmOGUTtaaPSGNmjBKpbng9MulQsJURQUAfUwY++o= +golang.org/x/net v0.49.0/go.mod h1:/ysNB2EvaqvesRkuLAyjI1ycPZlQHM3q01F02UY/MV8= +golang.org/x/sync v0.19.0 h1:vV+1eWNmZ5geRlYjzm2adRgW2/mcpevXNg50YZtPCE4= +golang.org/x/sync v0.19.0/go.mod h1:9KTHXmSnoGruLpwFjVSX0lNNA75CykiMECbovNTZqGI= +golang.org/x/sys v0.40.0 h1:DBZZqJ2Rkml6QMQsZywtnjnnGvHza6BTfYFWY9kjEWQ= +golang.org/x/sys v0.40.0/go.mod h1:OgkHotnGiDImocRcuBABYBEXf8A9a87e/uXjp9XT3ks= +golang.org/x/text v0.33.0 h1:B3njUFyqtHDUI5jMn1YIr5B0IE2U0qck04r6d4KPAxE= +golang.org/x/text v0.33.0/go.mod h1:LuMebE6+rBincTi9+xWTY8TztLzKHc/9C1uBCG27+q8= +gonum.org/v1/gonum v0.16.0 h1:5+ul4Swaf3ESvrOnidPp4GZbzf0mxVQpDCYUQE7OJfk= +gonum.org/v1/gonum v0.16.0/go.mod h1:fef3am4MQ93R2HHpKnLk4/Tbh/s0+wqD5nfa6Pnwy4E= +google.golang.org/genproto/googleapis/api v0.0.0-20260128011058-8636f8732409 h1:merA0rdPeUV3YIIfHHcH4qBkiQAc1nfCKSI7lB4cV2M= +google.golang.org/genproto/googleapis/api v0.0.0-20260128011058-8636f8732409/go.mod h1:fl8J1IvUjCilwZzQowmw2b7HQB2eAuYBabMXzWurF+I= +google.golang.org/genproto/googleapis/rpc v0.0.0-20260128011058-8636f8732409 h1:H86B94AW+VfJWDqFeEbBPhEtHzJwJfTbgE2lZa54ZAQ= +google.golang.org/genproto/googleapis/rpc v0.0.0-20260128011058-8636f8732409/go.mod h1:j9x/tPzZkyxcgEFkiKEEGxfvyumM01BEtsW8xzOahRQ= +google.golang.org/grpc v1.78.0 h1:K1XZG/yGDJnzMdd/uZHAkVqJE+xIDOcmdSFZkBUicNc= +google.golang.org/grpc v1.78.0/go.mod h1:I47qjTo4OKbMkjA/aOOwxDIiPSBofUtQUI5EfpWvW7U= +google.golang.org/protobuf v1.36.11 h1:fV6ZwhNocDyBLK0dj+fg8ektcVegBBuEolpbTQyBNVE= +google.golang.org/protobuf v1.36.11/go.mod h1:HTf+CrKn2C3g5S8VImy6tdcUvCska2kB7j23XfzDpco= gopkg.in/yaml.v3 v3.0.1 h1:fxVm/GzAzEWqLHuvctI91KS9hhNmmWOoWu0XTYJS7CA= gopkg.in/yaml.v3 v3.0.1/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM= diff --git a/otlp/sdk_handler.go b/otlp/sdk_handler.go new file mode 100644 index 0000000..fabb682 --- /dev/null +++ b/otlp/sdk_handler.go @@ -0,0 +1,398 @@ +package otlp + +import ( + "context" + "fmt" + "log" + "sync" + "time" + + "go.opentelemetry.io/otel/attribute" + "go.opentelemetry.io/otel/exporters/otlp/otlpmetric/otlpmetricgrpc" + "go.opentelemetry.io/otel/exporters/otlp/otlpmetric/otlpmetrichttp" + otelmetric "go.opentelemetry.io/otel/metric" + sdkmetric "go.opentelemetry.io/otel/sdk/metric" + "go.opentelemetry.io/otel/sdk/resource" + + "github.com/segmentio/stats/v5" +) + +// Protocol defines the transport protocol for OTLP export +type Protocol string + +const ( + // ProtocolGRPC uses gRPC transport + ProtocolGRPC Protocol = "grpc" + // ProtocolHTTPProtobuf uses HTTP with protobuf encoding + ProtocolHTTPProtobuf Protocol = "http/protobuf" +) + +// SDKHandler implements stats.Handler using the official OpenTelemetry SDK. +// It bridges stats metrics to OTel metrics and supports both HTTP and gRPC transports. +// +// This handler supports all standard OpenTelemetry environment variables: +// - OTEL_EXPORTER_OTLP_ENDPOINT +// - OTEL_EXPORTER_OTLP_PROTOCOL (grpc, http/protobuf) +// - OTEL_EXPORTER_OTLP_HEADERS +// - OTEL_EXPORTER_OTLP_TIMEOUT +// - OTEL_RESOURCE_ATTRIBUTES +// - OTEL_SERVICE_NAME +// - And more... +// +// Example usage: +// +// handler, err := otlp.NewSDKHandler(ctx, otlp.SDKConfig{ +// Protocol: otlp.ProtocolGRPC, +// Endpoint: "localhost:4317", +// }) +// if err != nil { +// log.Fatal(err) +// } +// defer handler.Shutdown(ctx) +// stats.Register(handler) +type SDKHandler struct { + provider *sdkmetric.MeterProvider + shutdownCtx context.Context // Context for shutdown operations only + mu sync.RWMutex + instruments map[string]instrument + resourceAttrs []attribute.KeyValue +} + +type instrument struct { + counter otelmetric.Int64Counter + gauge otelmetric.Float64Gauge + histogram otelmetric.Float64Histogram +} + +// SDKConfig contains configuration for the OpenTelemetry SDK handler +type SDKConfig struct { + // Protocol specifies the transport protocol (grpc or http/protobuf) + // If empty, defaults to ProtocolGRPC + Protocol Protocol + + // Endpoint specifies the OTLP endpoint + // For gRPC: "localhost:4317" + // For HTTP: "http://localhost:4318" + // If empty, uses OTEL_EXPORTER_OTLP_ENDPOINT environment variable + Endpoint string + + // Resource specifies the resource attributes for all metrics + // If nil, uses automatic resource detection + Resource *resource.Resource + + // ExportInterval specifies how often to export metrics + // If zero, defaults to 10 seconds + ExportInterval time.Duration + + // ExportTimeout specifies the timeout for exports + // If zero, defaults to 30 seconds + ExportTimeout time.Duration + + // HTTPOptions are additional options for HTTP protocol + // Only used when Protocol is ProtocolHTTPProtobuf + HTTPOptions []otlpmetrichttp.Option + + // GRPCOptions are additional options for gRPC protocol + // Only used when Protocol is ProtocolGRPC + GRPCOptions []otlpmetricgrpc.Option + + // ExponentialHistogram enables exponential histogram aggregation for histogram metrics. + // When true, histograms use base-2 exponential buckets which provide better accuracy + // and lower memory overhead compared to explicit bucket histograms. + // Default: false (uses explicit bucket histograms) + ExponentialHistogram bool + + // ExponentialHistogramMaxSize sets the maximum number of buckets for exponential histograms. + // Larger values provide better accuracy but use more memory. + // Default: 160 (if ExponentialHistogram is true) + // Ignored if ExponentialHistogram is false + ExponentialHistogramMaxSize int32 + + // ExponentialHistogramMaxScale sets the maximum scale (resolution) for exponential histograms. + // Higher values provide finer bucket granularity. + // Valid range: -10 to 20 + // Default: 20 (if ExponentialHistogram is true) + // Ignored if ExponentialHistogram is false + ExponentialHistogramMaxScale int32 + + // TemporalitySelector determines the temporality (cumulative vs delta) for each instrument kind. + // If nil, uses DefaultTemporalitySelector which returns CumulativeTemporality for all instruments. + // This is recommended for Prometheus and most OTLP backends. + // + // Available selectors: + // - sdkmetric.DefaultTemporalitySelector: Cumulative for all (default, Prometheus-compatible) + // - sdkmetric.CumulativeTemporalitySelector: Cumulative for all + // - sdkmetric.DeltaTemporalitySelector: Delta for all + // - sdkmetric.LowMemoryTemporalitySelector: Delta for Counters/Histograms, Cumulative for UpDownCounters + TemporalitySelector sdkmetric.TemporalitySelector +} + +// NewSDKHandler creates a new handler using the OpenTelemetry SDK. +// It automatically detects resources and supports all standard OTEL environment variables. +func NewSDKHandler(ctx context.Context, config SDKConfig) (*SDKHandler, error) { + // Set defaults + if config.Protocol == "" { + config.Protocol = ProtocolGRPC + } + if config.ExportInterval == 0 { + config.ExportInterval = 10 * time.Second + } + if config.ExportTimeout == 0 { + config.ExportTimeout = 30 * time.Second + } + if config.ExponentialHistogram { + if config.ExponentialHistogramMaxSize == 0 { + config.ExponentialHistogramMaxSize = 160 + } + if config.ExponentialHistogramMaxScale == 0 { + config.ExponentialHistogramMaxScale = 20 + } + } + + // Create resource if not provided + res := config.Resource + if res == nil { + var err error + res, err = resource.New(ctx, + resource.WithFromEnv(), + resource.WithTelemetrySDK(), + resource.WithHost(), + resource.WithProcess(), + ) + if err != nil { + return nil, fmt.Errorf("failed to create resource: %w", err) + } + } + + // Create exporter based on protocol + var exporter sdkmetric.Exporter + var err error + + switch config.Protocol { + case ProtocolGRPC: + opts := config.GRPCOptions + if config.Endpoint != "" { + opts = append([]otlpmetricgrpc.Option{otlpmetricgrpc.WithEndpoint(config.Endpoint)}, opts...) + } + // Configure temporality if provided (default is cumulative, which is Prometheus-compatible) + if config.TemporalitySelector != nil { + opts = append(opts, otlpmetricgrpc.WithTemporalitySelector(config.TemporalitySelector)) + } + exporter, err = otlpmetricgrpc.New(ctx, opts...) + if err != nil { + return nil, fmt.Errorf("failed to create gRPC exporter: %w", err) + } + + case ProtocolHTTPProtobuf: + opts := config.HTTPOptions + if config.Endpoint != "" { + opts = append([]otlpmetrichttp.Option{otlpmetrichttp.WithEndpoint(config.Endpoint)}, opts...) + } + // Configure temporality if provided (default is cumulative, which is Prometheus-compatible) + if config.TemporalitySelector != nil { + opts = append(opts, otlpmetrichttp.WithTemporalitySelector(config.TemporalitySelector)) + } + exporter, err = otlpmetrichttp.New(ctx, opts...) + if err != nil { + return nil, fmt.Errorf("failed to create HTTP exporter: %w", err) + } + + default: + return nil, fmt.Errorf("unsupported protocol: %s", config.Protocol) + } + + // Configure histogram aggregation if exponential histograms are enabled + var providerOpts []sdkmetric.Option + providerOpts = append(providerOpts, sdkmetric.WithResource(res)) + providerOpts = append(providerOpts, sdkmetric.WithReader(sdkmetric.NewPeriodicReader(exporter, + sdkmetric.WithInterval(config.ExportInterval), + sdkmetric.WithTimeout(config.ExportTimeout), + ))) + + if config.ExponentialHistogram { + // Configure exponential histogram aggregation for all histogram instruments + view := sdkmetric.NewView( + sdkmetric.Instrument{Kind: sdkmetric.InstrumentKindHistogram}, + sdkmetric.Stream{ + Aggregation: sdkmetric.AggregationBase2ExponentialHistogram{ + MaxSize: config.ExponentialHistogramMaxSize, + MaxScale: config.ExponentialHistogramMaxScale, + }, + }, + ) + providerOpts = append(providerOpts, sdkmetric.WithView(view)) + } + + // Create meter provider with configured options + provider := sdkmetric.NewMeterProvider(providerOpts...) + + return &SDKHandler{ + provider: provider, + shutdownCtx: ctx, + instruments: make(map[string]instrument), + }, nil +} + +// NewSDKHandlerFromEnv creates a handler using only environment variables. +// This is the simplest way to create a handler with full OpenTelemetry support. +// +// It respects all standard OTEL environment variables including: +// - OTEL_EXPORTER_OTLP_ENDPOINT +// - OTEL_EXPORTER_OTLP_PROTOCOL +// - OTEL_EXPORTER_OTLP_HEADERS +// - OTEL_RESOURCE_ATTRIBUTES +// - OTEL_SERVICE_NAME +func NewSDKHandlerFromEnv(ctx context.Context) (*SDKHandler, error) { + // The SDK exporters will automatically read all environment variables + return NewSDKHandler(ctx, SDKConfig{ + Protocol: ProtocolGRPC, // Can be overridden by OTEL_EXPORTER_OTLP_PROTOCOL + }) +} + +// HandleMeasures implements stats.Handler +func (h *SDKHandler) HandleMeasures(t time.Time, measures ...stats.Measure) { + // Use background context for recording metrics to avoid context cancellation issues + // The shutdownCtx is only used for shutdown operations + ctx := context.Background() + + meter := h.provider.Meter("github.com/segmentio/stats") + + for _, measure := range measures { + for _, field := range measure.Fields { + metricName := measure.Name + "." + field.Name + attrs := h.tagsToAttributes(measure.Tags) + + h.mu.RLock() + inst, exists := h.instruments[metricName] + h.mu.RUnlock() + + if !exists { + h.mu.Lock() + // Double-check after acquiring write lock + inst, exists = h.instruments[metricName] + if !exists { + inst = h.createInstruments(meter, metricName, field.Type()) + h.instruments[metricName] = inst + } + h.mu.Unlock() + } + + h.recordMetric(ctx, inst, field, metricName, attrs) + } + } +} + +// createInstruments creates OTel instruments based on field type +func (h *SDKHandler) createInstruments(meter otelmetric.Meter, name string, fieldType stats.FieldType) instrument { + var inst instrument + + switch fieldType { + case stats.Counter: + counter, err := meter.Int64Counter(name) + if err != nil { + log.Printf("stats/otlp: failed to create counter %s: %v", name, err) + } + inst.counter = counter + + case stats.Gauge: + // Use Float64Gauge for gauges (synchronous gauge instrument) + gauge, err := meter.Float64Gauge(name) + if err != nil { + log.Printf("stats/otlp: failed to create gauge %s: %v", name, err) + } + inst.gauge = gauge + + case stats.Histogram: + histogram, err := meter.Float64Histogram(name) + if err != nil { + log.Printf("stats/otlp: failed to create histogram %s: %v", name, err) + } + inst.histogram = histogram + } + + return inst +} + +// recordMetric records a metric value to the appropriate instrument +func (h *SDKHandler) recordMetric(ctx context.Context, inst instrument, field stats.Field, metricName string, attrs []attribute.KeyValue) { + opts := otelmetric.WithAttributes(attrs...) + + switch field.Type() { + case stats.Counter: + if inst.counter != nil { + inst.counter.Add(ctx, h.valueToInt64(field.Value), opts) + } + + case stats.Gauge: + if inst.gauge != nil { + // Gauges record instantaneous values directly + inst.gauge.Record(ctx, h.valueToFloat64(field.Value), opts) + } + + case stats.Histogram: + if inst.histogram != nil { + inst.histogram.Record(ctx, h.valueToFloat64(field.Value), opts) + } + } +} + +// tagsToAttributes converts stats tags to OTel attributes +func (h *SDKHandler) tagsToAttributes(tags []stats.Tag) []attribute.KeyValue { + attrs := make([]attribute.KeyValue, 0, len(tags)) + for _, tag := range tags { + attrs = append(attrs, attribute.String(tag.Name, tag.Value)) + } + return attrs +} + +// valueToInt64 converts stats.Value to int64 for counters +func (h *SDKHandler) valueToInt64(v stats.Value) int64 { + switch v.Type() { + case stats.Bool: + if v.Bool() { + return 1 + } + return 0 + case stats.Int: + return v.Int() + case stats.Uint: + return int64(v.Uint()) + case stats.Float: + return int64(v.Float()) + case stats.Duration: + return int64(v.Duration().Nanoseconds()) + } + return 0 +} + +// valueToFloat64 converts stats.Value to float64 for gauges and histograms +func (h *SDKHandler) valueToFloat64(v stats.Value) float64 { + switch v.Type() { + case stats.Bool: + if v.Bool() { + return 1.0 + } + return 0.0 + case stats.Int: + return float64(v.Int()) + case stats.Uint: + return float64(v.Uint()) + case stats.Float: + return v.Float() + case stats.Duration: + return v.Duration().Seconds() + } + return 0.0 +} + +// Flush implements stats.Flusher +func (h *SDKHandler) Flush() { + if err := h.provider.ForceFlush(h.shutdownCtx); err != nil { + log.Printf("stats/otlp: failed to flush: %v", err) + } +} + +// Shutdown gracefully shuts down the handler and exports any remaining metrics +func (h *SDKHandler) Shutdown(ctx context.Context) error { + return h.provider.Shutdown(ctx) +} diff --git a/otlp/sdk_handler_test.go b/otlp/sdk_handler_test.go new file mode 100644 index 0000000..cb6995a --- /dev/null +++ b/otlp/sdk_handler_test.go @@ -0,0 +1,361 @@ +package otlp + +import ( + "context" + "testing" + "time" + + "github.com/segmentio/stats/v5" +) + +func TestSDKHandler_HandleMeasures(t *testing.T) { + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + // Create handler with gRPC protocol + handler, err := NewSDKHandler(ctx, SDKConfig{ + Protocol: ProtocolGRPC, + Endpoint: "localhost:4317", + ExportInterval: 1 * time.Second, + }) + if err != nil { + t.Fatalf("failed to create handler: %v", err) + } + defer handler.Shutdown(ctx) + + now := time.Now() + + // Test counter + handler.HandleMeasures(now, stats.Measure{ + Name: "test.counter", + Fields: []stats.Field{stats.MakeField("count", 1, stats.Counter)}, + Tags: []stats.Tag{{Name: "env", Value: "test"}}, + }) + + // Test gauge + handler.HandleMeasures(now, stats.Measure{ + Name: "test.gauge", + Fields: []stats.Field{stats.MakeField("value", 42.5, stats.Gauge)}, + Tags: []stats.Tag{{Name: "env", Value: "test"}}, + }) + + // Test histogram + handler.HandleMeasures(now, stats.Measure{ + Name: "test.histogram", + Fields: []stats.Field{stats.MakeField("duration", 100, stats.Histogram)}, + Tags: []stats.Tag{{Name: "env", Value: "test"}}, + }) + + // Flush metrics + handler.Flush() + + // Verify instruments were created + if len(handler.instruments) != 3 { + t.Errorf("expected 3 instruments, got %d", len(handler.instruments)) + } +} + +func TestSDKHandler_HTTP(t *testing.T) { + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + // Create handler with HTTP protocol + handler, err := NewSDKHandler(ctx, SDKConfig{ + Protocol: ProtocolHTTPProtobuf, + Endpoint: "localhost:4318", + ExportInterval: 1 * time.Second, + }) + if err != nil { + t.Fatalf("failed to create handler: %v", err) + } + defer handler.Shutdown(ctx) + + now := time.Now() + + // Test basic metric + handler.HandleMeasures(now, stats.Measure{ + Name: "http.test", + Fields: []stats.Field{stats.MakeField("requests", 10, stats.Counter)}, + Tags: []stats.Tag{{Name: "method", Value: "GET"}}, + }) + + handler.Flush() +} + +func TestSDKHandler_FromEnv(t *testing.T) { + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + // This test demonstrates using environment variables + // In real usage, OTEL_EXPORTER_OTLP_ENDPOINT and other vars would be set + handler, err := NewSDKHandlerFromEnv(ctx) + if err != nil { + t.Fatalf("failed to create handler from env: %v", err) + } + defer handler.Shutdown(ctx) + + now := time.Now() + + handler.HandleMeasures(now, stats.Measure{ + Name: "env.test", + Fields: []stats.Field{stats.MakeField("value", 1, stats.Counter)}, + Tags: []stats.Tag{{Name: "source", Value: "env"}}, + }) +} + +func TestSDKHandler_MultipleMetrics(t *testing.T) { + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + handler, err := NewSDKHandler(ctx, SDKConfig{ + Protocol: ProtocolGRPC, + Endpoint: "localhost:4317", + ExportInterval: 1 * time.Second, + }) + if err != nil { + t.Fatalf("failed to create handler: %v", err) + } + defer handler.Shutdown(ctx) + + now := time.Now() + + // Send multiple measures in one call + handler.HandleMeasures(now, + stats.Measure{ + Name: "app.requests", + Fields: []stats.Field{stats.MakeField("count", 100, stats.Counter)}, + Tags: []stats.Tag{{Name: "status", Value: "200"}}, + }, + stats.Measure{ + Name: "app.requests", + Fields: []stats.Field{stats.MakeField("count", 10, stats.Counter)}, + Tags: []stats.Tag{{Name: "status", Value: "404"}}, + }, + stats.Measure{ + Name: "app.latency", + Fields: []stats.Field{ + stats.MakeField("p50", 50, stats.Histogram), + stats.MakeField("p99", 200, stats.Histogram), + }, + Tags: []stats.Tag{{Name: "endpoint", Value: "/api/users"}}, + }, + ) + + handler.Flush() + + // Should have created 4 instruments: + // app.requests.count (2 tag variations share same instrument) + // app.latency.p50 + // app.latency.p99 + if len(handler.instruments) < 3 { + t.Errorf("expected at least 3 instruments, got %d", len(handler.instruments)) + } +} + +func TestSDKHandler_ValueConversion(t *testing.T) { + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + handler, err := NewSDKHandler(ctx, SDKConfig{ + Protocol: ProtocolGRPC, + Endpoint: "localhost:4317", + ExportInterval: 1 * time.Second, + }) + if err != nil { + t.Fatalf("failed to create handler: %v", err) + } + defer handler.Shutdown(ctx) + + now := time.Now() + + // Test different value types + testCases := []struct { + name string + value interface{} + fieldType stats.FieldType + }{ + {"int", int(42), stats.Counter}, + {"uint", uint(42), stats.Counter}, + {"float", float64(42.5), stats.Gauge}, + {"duration", time.Second, stats.Histogram}, + {"bool_true", true, stats.Counter}, + {"bool_false", false, stats.Counter}, + } + + for _, tc := range testCases { + t.Run(tc.name, func(t *testing.T) { + handler.HandleMeasures(now, stats.Measure{ + Name: "conversion.test", + Fields: []stats.Field{stats.MakeField(tc.name, tc.value, tc.fieldType)}, + Tags: []stats.Tag{{Name: "type", Value: tc.name}}, + }) + }) + } + + handler.Flush() +} + +func TestSDKHandler_GaugeBehavior(t *testing.T) { + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + handler, err := NewSDKHandler(ctx, SDKConfig{ + Protocol: ProtocolGRPC, + Endpoint: "localhost:4317", + ExportInterval: 1 * time.Second, + }) + if err != nil { + t.Fatalf("failed to create handler: %v", err) + } + defer handler.Shutdown(ctx) + + now := time.Now() + + // Test that gauges maintain absolute values, not cumulative + // Set gauge to 100 + handler.HandleMeasures(now, stats.Measure{ + Name: "test.gauge", + Fields: []stats.Field{stats.MakeField("value", 100, stats.Gauge)}, + Tags: []stats.Tag{{Name: "test", Value: "gauge"}}, + }) + + // Set gauge to 50 (should be 50, not 150) + handler.HandleMeasures(now.Add(time.Second), stats.Measure{ + Name: "test.gauge", + Fields: []stats.Field{stats.MakeField("value", 50, stats.Gauge)}, + Tags: []stats.Tag{{Name: "test", Value: "gauge"}}, + }) + + // Set gauge to 75 (should be 75, not 125 or 225) + handler.HandleMeasures(now.Add(2*time.Second), stats.Measure{ + Name: "test.gauge", + Fields: []stats.Field{stats.MakeField("value", 75, stats.Gauge)}, + Tags: []stats.Tag{{Name: "test", Value: "gauge"}}, + }) + + // Gauges now use native Float64Gauge which maintains absolute values directly + // No need to track internal state - the OTel SDK handles this + + handler.Flush() + + // Verify instrument was created + if len(handler.instruments) < 1 { + t.Errorf("expected at least 1 instrument, got %d", len(handler.instruments)) + } +} + +func TestSDKHandler_ExponentialHistogram(t *testing.T) { + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + // Create handler with exponential histogram enabled + handler, err := NewSDKHandler(ctx, SDKConfig{ + Protocol: ProtocolGRPC, + Endpoint: "localhost:4317", + ExportInterval: 1 * time.Second, + ExponentialHistogram: true, + ExponentialHistogramMaxSize: 160, + ExponentialHistogramMaxScale: 20, + }) + if err != nil { + t.Fatalf("failed to create handler: %v", err) + } + defer handler.Shutdown(ctx) + + now := time.Now() + + // Test histogram with exponential aggregation + handler.HandleMeasures(now, stats.Measure{ + Name: "request.duration", + Fields: []stats.Field{stats.MakeField("ms", 100, stats.Histogram)}, + Tags: []stats.Tag{{Name: "endpoint", Value: "/api/users"}}, + }) + + handler.HandleMeasures(now.Add(time.Millisecond), stats.Measure{ + Name: "request.duration", + Fields: []stats.Field{stats.MakeField("ms", 250, stats.Histogram)}, + Tags: []stats.Tag{{Name: "endpoint", Value: "/api/users"}}, + }) + + handler.HandleMeasures(now.Add(2*time.Millisecond), stats.Measure{ + Name: "request.duration", + Fields: []stats.Field{stats.MakeField("ms", 150, stats.Histogram)}, + Tags: []stats.Tag{{Name: "endpoint", Value: "/api/users"}}, + }) + + handler.Flush() + + // Verify instrument was created + if len(handler.instruments) < 1 { + t.Errorf("expected at least 1 instrument, got %d", len(handler.instruments)) + } +} + +func TestSDKHandler_CumulativeTemporality(t *testing.T) { + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + // Create handler with default (cumulative) temporality + handler, err := NewSDKHandler(ctx, SDKConfig{ + Protocol: ProtocolGRPC, + Endpoint: "localhost:4317", + ExportInterval: 1 * time.Second, + // TemporalitySelector: nil means default cumulative temporality + }) + if err != nil { + t.Fatalf("failed to create handler: %v", err) + } + defer handler.Shutdown(ctx) + + now := time.Now() + + // Test counter - should accumulate + handler.HandleMeasures(now, stats.Measure{ + Name: "requests", + Fields: []stats.Field{stats.MakeField("count", 10, stats.Counter)}, + Tags: []stats.Tag{{Name: "endpoint", Value: "/api"}}, + }) + + handler.HandleMeasures(now.Add(time.Second), stats.Measure{ + Name: "requests", + Fields: []stats.Field{stats.MakeField("count", 15, stats.Counter)}, + Tags: []stats.Tag{{Name: "endpoint", Value: "/api"}}, + }) + + // With cumulative temporality, counters accumulate (10 + 15 = 25 total) + // The SDK handles this internally + + handler.Flush() + + // Verify instrument was created + if len(handler.instruments) < 1 { + t.Errorf("expected at least 1 instrument, got %d", len(handler.instruments)) + } +} + +func BenchmarkSDKHandler_HandleMeasures(b *testing.B) { + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + handler, err := NewSDKHandler(ctx, SDKConfig{ + Protocol: ProtocolGRPC, + Endpoint: "localhost:4317", + ExportInterval: 10 * time.Second, + }) + if err != nil { + b.Fatalf("failed to create handler: %v", err) + } + defer handler.Shutdown(ctx) + + now := time.Now() + measure := stats.Measure{ + Name: "benchmark.test", + Fields: []stats.Field{stats.MakeField("count", 1, stats.Counter)}, + Tags: []stats.Tag{{Name: "env", Value: "bench"}}, + } + + b.ResetTimer() + for i := 0; i < b.N; i++ { + handler.HandleMeasures(now, measure) + } +} diff --git a/version/version.go b/version/version.go index 263615f..eb4aaab 100644 --- a/version/version.go +++ b/version/version.go @@ -6,7 +6,7 @@ import ( "sync" ) -const Version = "5.8.0" +const Version = "5.9.0" var ( vsnOnce sync.Once