Skip to content

Commit

Permalink
Calling handleRPC with different context derived from the original co…
Browse files Browse the repository at this point in the history
…ntext
  • Loading branch information
menghanl committed May 9, 2017
1 parent ffa4ec7 commit f5341a2
Show file tree
Hide file tree
Showing 6 changed files with 13 additions and 17 deletions.
2 changes: 1 addition & 1 deletion call.go
Original file line number Diff line number Diff line change
Expand Up @@ -182,7 +182,7 @@ func invoke(ctx context.Context, method string, args, reply interface{}, cc *Cli
ctx = newContextWithRPCInfo(ctx)
sh := cc.dopts.copts.StatsHandler
if sh != nil {
ctx = sh.TagRPC(ctx, &stats.RPCTagInfo{FullMethodName: method})
ctx = sh.TagRPC(ctx, &stats.RPCTagInfo{FullMethodName: method, FailFast: c.failFast})
begin := &stats.Begin{
Client: true,
BeginTime: time.Now(),
Expand Down
4 changes: 3 additions & 1 deletion stats/handlers.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,13 +45,15 @@ type ConnTagInfo struct {
RemoteAddr net.Addr
// LocalAddr is the local address of the corresponding connection.
LocalAddr net.Addr
// TODO add QOS related fields.
}

// RPCTagInfo defines the relevant information needed by RPC context tagger.
// FailFast is only valid on client side, it's always false on server side.
type RPCTagInfo struct {
// FullMethodName is the RPC method in the format of /package.service/method.
FullMethodName string
// FailFast indicates if this RPC is failfast.
FailFast bool
}

// Handler defines the interface for the related stats handling (e.g., RPCs, connections).
Expand Down
9 changes: 5 additions & 4 deletions stats/stats_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -800,13 +800,14 @@ func checkClientStats(t *testing.T, got []*gotData, expect *expectedData, checkF
t.Fatalf("got %v stats, want %v stats", len(got), expectLen)
}

var rpcctx context.Context
var tagInfoInCtx *stats.RPCTagInfo
for i := 0; i < len(got); i++ {
if _, ok := got[i].s.(stats.RPCStats); ok {
if rpcctx != nil && got[i].ctx != rpcctx {
t.Fatalf("got different contexts with stats %T", got[i].s)
tagInfoInCtxNew, _ := got[i].ctx.Value(rpcCtxKey{}).(*stats.RPCTagInfo)
if tagInfoInCtx != nil && tagInfoInCtx != tagInfoInCtxNew {
t.Fatalf("got context containing different tagInfo with stats %T", got[i].s)
}
rpcctx = got[i].ctx
tagInfoInCtx = tagInfoInCtxNew
}
}

Expand Down
2 changes: 1 addition & 1 deletion stream.go
Original file line number Diff line number Diff line change
Expand Up @@ -154,7 +154,7 @@ func newClientStream(ctx context.Context, desc *StreamDesc, cc *ClientConn, meth
ctx = newContextWithRPCInfo(ctx)
sh := cc.dopts.copts.StatsHandler
if sh != nil {
ctx = sh.TagRPC(ctx, &stats.RPCTagInfo{FullMethodName: method})
ctx = sh.TagRPC(ctx, &stats.RPCTagInfo{FullMethodName: method, FailFast: c.failFast})
begin := &stats.Begin{
Client: true,
BeginTime: time.Now(),
Expand Down
8 changes: 3 additions & 5 deletions transport/http2_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -334,7 +334,6 @@ func (t *http2Client) NewStream(ctx context.Context, callHdr *CallHdr) (_ *Strea
if t.authInfo != nil {
pr.AuthInfo = t.authInfo
}
userCtx := ctx
ctx = peer.NewContext(ctx, pr)
authData := make(map[string]string)
for _, c := range t.creds {
Expand Down Expand Up @@ -401,7 +400,6 @@ func (t *http2Client) NewStream(ctx context.Context, callHdr *CallHdr) (_ *Strea
return nil, ErrConnClosing
}
s := t.newStream(ctx, callHdr)
s.clientStatsCtx = userCtx
t.activeStreams[s.id] = s
// If the number of active streams change from 0 to 1, then check if keepalive
// has gone dormant. If so, wake it up.
Expand Down Expand Up @@ -514,7 +512,7 @@ func (t *http2Client) NewStream(ctx context.Context, callHdr *CallHdr) (_ *Strea
LocalAddr: t.localAddr,
Compression: callHdr.SendCompress,
}
t.statsHandler.HandleRPC(s.clientStatsCtx, outHeader)
t.statsHandler.HandleRPC(s.ctx, outHeader)
}
t.writableChan <- 0
return s, nil
Expand Down Expand Up @@ -995,13 +993,13 @@ func (t *http2Client) operateHeaders(frame *http2.MetaHeadersFrame) {
Client: true,
WireLength: int(frame.Header().Length),
}
t.statsHandler.HandleRPC(s.clientStatsCtx, inHeader)
t.statsHandler.HandleRPC(s.ctx, inHeader)
} else {
inTrailer := &stats.InTrailer{
Client: true,
WireLength: int(frame.Header().Length),
}
t.statsHandler.HandleRPC(s.clientStatsCtx, inTrailer)
t.statsHandler.HandleRPC(s.ctx, inTrailer)
}
}
}()
Expand Down
5 changes: 0 additions & 5 deletions transport/transport.go
Original file line number Diff line number Diff line change
Expand Up @@ -171,11 +171,6 @@ type Stream struct {
id uint32
// nil for client side Stream.
st ServerTransport
// clientStatsCtx keeps the user context for stats handling.
// It's only valid on client side. Server side stats context is same as s.ctx.
// All client side stats collection should use the clientStatsCtx (instead of the stream context)
// so that all the generated stats for a particular RPC can be associated in the processing phase.
clientStatsCtx context.Context
// ctx is the associated context of the stream.
ctx context.Context
// cancel is always nil for client side Stream.
Expand Down

0 comments on commit f5341a2

Please sign in to comment.