Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Calling handleRPC with context derived from the original #1227

Merged
merged 2 commits into from
May 11, 2017
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion call.go
Original file line number Diff line number Diff line change
Expand Up @@ -182,7 +182,7 @@ func invoke(ctx context.Context, method string, args, reply interface{}, cc *Cli
ctx = newContextWithRPCInfo(ctx)
sh := cc.dopts.copts.StatsHandler
if sh != nil {
ctx = sh.TagRPC(ctx, &stats.RPCTagInfo{FullMethodName: method})
ctx = sh.TagRPC(ctx, &stats.RPCTagInfo{FullMethodName: method, FailFast: c.failFast})
begin := &stats.Begin{
Client: true,
BeginTime: time.Now(),
Expand Down
4 changes: 3 additions & 1 deletion stats/handlers.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,13 +45,15 @@ type ConnTagInfo struct {
RemoteAddr net.Addr
// LocalAddr is the local address of the corresponding connection.
LocalAddr net.Addr
// TODO add QOS related fields.
}

// RPCTagInfo defines the relevant information needed by RPC context tagger.
// FailFast is only valid on client side, it's always false on server side.
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think it would be more appropriate to document this on the field than on the struct.

How far do we think we will be extending this struct? Would it make sense to have sub-structures for client- and server- only fields?

type RPCTagInfo struct {
// FullMethodName is the RPC method in the format of /package.service/method.
FullMethodName string
// FailFast indicates if this RPC is failfast.
FailFast bool
}

// Handler defines the interface for the related stats handling (e.g., RPCs, connections).
Expand Down
9 changes: 5 additions & 4 deletions stats/stats_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -800,13 +800,14 @@ func checkClientStats(t *testing.T, got []*gotData, expect *expectedData, checkF
t.Fatalf("got %v stats, want %v stats", len(got), expectLen)
}

var rpcctx context.Context
var tagInfoInCtx *stats.RPCTagInfo
for i := 0; i < len(got); i++ {
if _, ok := got[i].s.(stats.RPCStats); ok {
if rpcctx != nil && got[i].ctx != rpcctx {
t.Fatalf("got different contexts with stats %T", got[i].s)
tagInfoInCtxNew, _ := got[i].ctx.Value(rpcCtxKey{}).(*stats.RPCTagInfo)
if tagInfoInCtx != nil && tagInfoInCtx != tagInfoInCtxNew {
t.Fatalf("got context containing different tagInfo with stats %T", got[i].s)
}
rpcctx = got[i].ctx
tagInfoInCtx = tagInfoInCtxNew
}
}

Expand Down
2 changes: 1 addition & 1 deletion stream.go
Original file line number Diff line number Diff line change
Expand Up @@ -154,7 +154,7 @@ func newClientStream(ctx context.Context, desc *StreamDesc, cc *ClientConn, meth
ctx = newContextWithRPCInfo(ctx)
sh := cc.dopts.copts.StatsHandler
if sh != nil {
ctx = sh.TagRPC(ctx, &stats.RPCTagInfo{FullMethodName: method})
ctx = sh.TagRPC(ctx, &stats.RPCTagInfo{FullMethodName: method, FailFast: c.failFast})
begin := &stats.Begin{
Client: true,
BeginTime: time.Now(),
Expand Down
8 changes: 3 additions & 5 deletions transport/http2_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -334,7 +334,6 @@ func (t *http2Client) NewStream(ctx context.Context, callHdr *CallHdr) (_ *Strea
if t.authInfo != nil {
pr.AuthInfo = t.authInfo
}
userCtx := ctx
ctx = peer.NewContext(ctx, pr)
authData := make(map[string]string)
for _, c := range t.creds {
Expand Down Expand Up @@ -401,7 +400,6 @@ func (t *http2Client) NewStream(ctx context.Context, callHdr *CallHdr) (_ *Strea
return nil, ErrConnClosing
}
s := t.newStream(ctx, callHdr)
s.clientStatsCtx = userCtx
t.activeStreams[s.id] = s
// If the number of active streams change from 0 to 1, then check if keepalive
// has gone dormant. If so, wake it up.
Expand Down Expand Up @@ -514,7 +512,7 @@ func (t *http2Client) NewStream(ctx context.Context, callHdr *CallHdr) (_ *Strea
LocalAddr: t.localAddr,
Compression: callHdr.SendCompress,
}
t.statsHandler.HandleRPC(s.clientStatsCtx, outHeader)
t.statsHandler.HandleRPC(s.ctx, outHeader)
}
t.writableChan <- 0
return s, nil
Expand Down Expand Up @@ -995,13 +993,13 @@ func (t *http2Client) operateHeaders(frame *http2.MetaHeadersFrame) {
Client: true,
WireLength: int(frame.Header().Length),
}
t.statsHandler.HandleRPC(s.clientStatsCtx, inHeader)
t.statsHandler.HandleRPC(s.ctx, inHeader)
} else {
inTrailer := &stats.InTrailer{
Client: true,
WireLength: int(frame.Header().Length),
}
t.statsHandler.HandleRPC(s.clientStatsCtx, inTrailer)
t.statsHandler.HandleRPC(s.ctx, inTrailer)
}
}
}()
Expand Down
5 changes: 0 additions & 5 deletions transport/transport.go
Original file line number Diff line number Diff line change
Expand Up @@ -171,11 +171,6 @@ type Stream struct {
id uint32
// nil for client side Stream.
st ServerTransport
// clientStatsCtx keeps the user context for stats handling.
// It's only valid on client side. Server side stats context is same as s.ctx.
// All client side stats collection should use the clientStatsCtx (instead of the stream context)
// so that all the generated stats for a particular RPC can be associated in the processing phase.
clientStatsCtx context.Context
// ctx is the associated context of the stream.
ctx context.Context
// cancel is always nil for client side Stream.
Expand Down