Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

stats/opentelemetry: CSM Observability client side component changes #7256

Merged
merged 6 commits into from
May 23, 2024
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
80 changes: 70 additions & 10 deletions stats/opentelemetry/client_metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,11 +22,13 @@ import (
"time"

"google.golang.org/grpc"
istats "google.golang.org/grpc/internal/stats"
"google.golang.org/grpc/metadata"
"google.golang.org/grpc/stats"
"google.golang.org/grpc/status"

"go.opentelemetry.io/otel/attribute"
"go.opentelemetry.io/otel/metric"
otelattribute "go.opentelemetry.io/otel/attribute"
otelmetric "go.opentelemetry.io/otel/metric"
)

type clientStatsHandler struct {
Expand All @@ -49,11 +51,11 @@ func (csh *clientStatsHandler) initializeMetrics() {

setOfMetrics := csh.o.MetricsOptions.Metrics.metrics

csh.clientMetrics.attemptStarted = createInt64Counter(setOfMetrics, "grpc.client.attempt.started", meter, metric.WithUnit("attempt"), metric.WithDescription("Number of client call attempts started."))
csh.clientMetrics.attemptDuration = createFloat64Histogram(setOfMetrics, "grpc.client.attempt.duration", meter, metric.WithUnit("s"), metric.WithDescription("End-to-end time taken to complete a client call attempt."), metric.WithExplicitBucketBoundaries(DefaultLatencyBounds...))
csh.clientMetrics.attemptSentTotalCompressedMessageSize = createInt64Histogram(setOfMetrics, "grpc.client.attempt.sent_total_compressed_message_size", meter, metric.WithUnit("By"), metric.WithDescription("Compressed message bytes sent per client call attempt."), metric.WithExplicitBucketBoundaries(DefaultSizeBounds...))
csh.clientMetrics.attemptRcvdTotalCompressedMessageSize = createInt64Histogram(setOfMetrics, "grpc.client.attempt.rcvd_total_compressed_message_size", meter, metric.WithUnit("By"), metric.WithDescription("Compressed message bytes received per call attempt."), metric.WithExplicitBucketBoundaries(DefaultSizeBounds...))
csh.clientMetrics.callDuration = createFloat64Histogram(setOfMetrics, "grpc.client.call.duration", meter, metric.WithUnit("s"), metric.WithDescription("Time taken by gRPC to complete an RPC from application's perspective."), metric.WithExplicitBucketBoundaries(DefaultLatencyBounds...))
csh.clientMetrics.attemptStarted = createInt64Counter(setOfMetrics, "grpc.client.attempt.started", meter, otelmetric.WithUnit("attempt"), otelmetric.WithDescription("Number of client call attempts started."))
csh.clientMetrics.attemptDuration = createFloat64Histogram(setOfMetrics, "grpc.client.attempt.duration", meter, otelmetric.WithUnit("s"), otelmetric.WithDescription("End-to-end time taken to complete a client call attempt."), otelmetric.WithExplicitBucketBoundaries(DefaultLatencyBounds...))
csh.clientMetrics.attemptSentTotalCompressedMessageSize = createInt64Histogram(setOfMetrics, "grpc.client.attempt.sent_total_compressed_message_size", meter, otelmetric.WithUnit("By"), otelmetric.WithDescription("Compressed message bytes sent per client call attempt."), otelmetric.WithExplicitBucketBoundaries(DefaultSizeBounds...))
csh.clientMetrics.attemptRcvdTotalCompressedMessageSize = createInt64Histogram(setOfMetrics, "grpc.client.attempt.rcvd_total_compressed_message_size", meter, otelmetric.WithUnit("By"), otelmetric.WithDescription("Compressed message bytes received per call attempt."), otelmetric.WithExplicitBucketBoundaries(DefaultSizeBounds...))
csh.clientMetrics.callDuration = createFloat64Histogram(setOfMetrics, "grpc.client.call.duration", meter, otelmetric.WithUnit("s"), otelmetric.WithDescription("Time taken by gRPC to complete an RPC from application's perspective."), otelmetric.WithExplicitBucketBoundaries(DefaultLatencyBounds...))
}

func (csh *clientStatsHandler) unaryInterceptor(ctx context.Context, method string, req, reply any, cc *grpc.ClientConn, invoker grpc.UnaryInvoker, opts ...grpc.CallOption) error {
Expand All @@ -63,6 +65,15 @@ func (csh *clientStatsHandler) unaryInterceptor(ctx context.Context, method stri
}
ctx = setCallInfo(ctx, ci)

if csh.o.MetricsOptions.pluginOption != nil {
md := csh.o.MetricsOptions.pluginOption.GetMetadata()
for k, v := range md {
if len(v) == 1 {
dfawley marked this conversation as resolved.
Show resolved Hide resolved
ctx = metadata.AppendToOutgoingContext(ctx, k, v[0])
}
}
}

startTime := time.Now()
err := invoker(ctx, method, req, reply, cc, opts...)
csh.perCallMetrics(ctx, err, startTime, ci)
Expand Down Expand Up @@ -98,6 +109,16 @@ func (csh *clientStatsHandler) streamInterceptor(ctx context.Context, desc *grpc
method: csh.determineMethod(method, opts...),
}
ctx = setCallInfo(ctx, ci)

if csh.o.MetricsOptions.pluginOption != nil {
md := csh.o.MetricsOptions.pluginOption.GetMetadata()
for k, v := range md {
if len(v) == 1 {
ctx = metadata.AppendToOutgoingContext(ctx, k, v[0])
}
}
}

startTime := time.Now()

callback := func(err error) {
Expand All @@ -110,7 +131,7 @@ func (csh *clientStatsHandler) streamInterceptor(ctx context.Context, desc *grpc
func (csh *clientStatsHandler) perCallMetrics(ctx context.Context, err error, startTime time.Time, ci *callInfo) {
s := status.Convert(err)
callLatency := float64(time.Since(startTime)) / float64(time.Second)
csh.clientMetrics.callDuration.Record(ctx, callLatency, metric.WithAttributes(attribute.String("grpc.method", ci.method), attribute.String("grpc.target", ci.target), attribute.String("grpc.status", canonicalString(s.Code()))))
csh.clientMetrics.callDuration.Record(ctx, callLatency, otelmetric.WithAttributes(otelattribute.String("grpc.method", ci.method), otelattribute.String("grpc.target", ci.target), otelattribute.String("grpc.status", canonicalString(s.Code()))))
}

// TagConn exists to satisfy stats.Handler.
Expand All @@ -123,8 +144,21 @@ func (csh *clientStatsHandler) HandleConn(context.Context, stats.ConnStats) {}

// TagRPC implements per RPC attempt context management.
func (csh *clientStatsHandler) TagRPC(ctx context.Context, info *stats.RPCTagInfo) context.Context {
// Numerous stats handlers can be used for the same channel. The cluster
// impl balancer which writes to this will only write once, thus have this
// stats handler's per attempt scoped context point to the same optional
// labels map if set.
var labels *istats.Labels
if labels = istats.GetLabels(ctx); labels == nil {
labels = &istats.Labels{
TelemetryLabels: make(map[string]string),
}
ctx = istats.SetLabels(ctx, labels)
}
mi := &metricsInfo{ // populates information about RPC start.
startTime: time.Now(),
xdsLabels: labels.TelemetryLabels,
method: info.FullMethodName,
}
ri := &rpcInfo{
mi: mi,
Expand All @@ -150,17 +184,27 @@ func (csh *clientStatsHandler) processRPCEvent(ctx context.Context, s stats.RPCS
return
}

csh.clientMetrics.attemptStarted.Add(ctx, 1, metric.WithAttributes(attribute.String("grpc.method", ci.method), attribute.String("grpc.target", ci.target)))
csh.clientMetrics.attemptStarted.Add(ctx, 1, otelmetric.WithAttributes(otelattribute.String("grpc.method", ci.method), otelattribute.String("grpc.target", ci.target)))
case *stats.OutPayload:
atomic.AddInt64(&mi.sentCompressedBytes, int64(st.CompressedLength))
case *stats.InPayload:
atomic.AddInt64(&mi.recvCompressedBytes, int64(st.CompressedLength))
case *stats.InHeader:
csh.setLabelsFromPluginOption(mi, st.Header)
case *stats.InTrailer:
csh.setLabelsFromPluginOption(mi, st.Trailer)
case *stats.End:
csh.processRPCEnd(ctx, mi, st)
default:
}
}

func (csh *clientStatsHandler) setLabelsFromPluginOption(mi *metricsInfo, incomingMetadata metadata.MD) {
if mi.pluginOptionLabels != nil && csh.o.MetricsOptions.pluginOption != nil {
mi.pluginOptionLabels = csh.o.MetricsOptions.pluginOption.GetLabels(incomingMetadata)
}
}

func (csh *clientStatsHandler) processRPCEnd(ctx context.Context, mi *metricsInfo, e *stats.End) {
ci := getCallInfo(ctx)
if ci == nil {
Expand All @@ -174,7 +218,23 @@ func (csh *clientStatsHandler) processRPCEnd(ctx context.Context, mi *metricsInf
st = canonicalString(s.Code())
}

clientAttributeOption := metric.WithAttributes(attribute.String("grpc.method", ci.method), attribute.String("grpc.target", ci.target), attribute.String("grpc.status", st))
attributes := []otelattribute.KeyValue{
otelattribute.String("grpc.method", ci.method),
otelattribute.String("grpc.target", ci.target),
otelattribute.String("grpc.status", st),
}

for k, v := range mi.pluginOptionLabels {
attributes = append(attributes, otelattribute.String(k, v))
}

for _, o := range csh.o.MetricsOptions.OptionalLabels {
if val, ok := mi.xdsLabels[o]; ok {
attributes = append(attributes, otelattribute.String(o, val))
}
}

clientAttributeOption := otelmetric.WithAttributes(attributes...)
csh.clientMetrics.attemptDuration.Record(ctx, latency, clientAttributeOption)
csh.clientMetrics.attemptSentTotalCompressedMessageSize.Record(ctx, atomic.LoadInt64(&mi.sentCompressedBytes), clientAttributeOption)
csh.clientMetrics.attemptRcvdTotalCompressedMessageSize.Record(ctx, atomic.LoadInt64(&mi.recvCompressedBytes), clientAttributeOption)
Expand Down
11 changes: 11 additions & 0 deletions stats/opentelemetry/opentelemetry.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ import (
"google.golang.org/grpc/codes"
"google.golang.org/grpc/grpclog"
"google.golang.org/grpc/internal"
otelinternal "google.golang.org/grpc/stats/opentelemetry/internal"

"go.opentelemetry.io/otel/metric"
"go.opentelemetry.io/otel/metric/noop"
Expand Down Expand Up @@ -126,6 +127,13 @@ type MetricsOptions struct {
// grpc.StaticMethodCallOption as a call option into Invoke or NewStream.
// This only applies for server side metrics.
MethodAttributeFilter func(string) bool

// OptionalLabels are labels received from LB Policies that this component
// should add to metrics that record after receiving incoming metadata.
OptionalLabels []string

// pluginOption is used to get labels to attach to certain metrics, if set.
pluginOption otelinternal.PluginOption
}

// DialOption returns a dial option which enables OpenTelemetry instrumentation
Expand Down Expand Up @@ -220,6 +228,9 @@ type metricsInfo struct {

startTime time.Time
method string

pluginOptionLabels map[string]string // pluginOptionLabels to attach to metrics emitted
xdsLabels map[string]string
}

type clientMetrics struct {
Expand Down
Loading