Skip to content

Commit

Permalink
store: fix lazyRespSet At()
Browse files Browse the repository at this point in the history
Signed-off-by: Giedrius Statkevičius <[email protected]>
  • Loading branch information
GiedriusS committed Sep 8, 2022
1 parent c0d3f46 commit 003c51e
Show file tree
Hide file tree
Showing 3 changed files with 41 additions and 35 deletions.
11 changes: 10 additions & 1 deletion pkg/store/proxy_heap.go
Original file line number Diff line number Diff line change
Expand Up @@ -283,7 +283,8 @@ type lazyRespSet struct {
bufferedResponsesMtx *sync.Mutex
lastResp *storepb.SeriesResponse

noMoreData bool
noMoreData bool
initialized bool

shardMatcher *storepb.ShardMatcher
}
Expand Down Expand Up @@ -326,6 +327,14 @@ func (l *lazyRespSet) Next() bool {
}

func (l *lazyRespSet) At() *storepb.SeriesResponse {
// We need to wait for at least one response so that we would be able to properly build the heap.
if !l.initialized {
l.Next()
l.initialized = true
return l.lastResp
}

// Next() was called previously.
return l.lastResp
}

Expand Down
60 changes: 28 additions & 32 deletions pkg/store/proxy_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -513,42 +513,38 @@ func TestProxyStore_Series(t *testing.T) {
},
},
} {
for _, strategy := range []RetrievalStrategy{EagerRetrieval, LazyRetrieval} {
if ok := t.Run(fmt.Sprintf("%s/%s", tc.title, strategy), func(t *testing.T) {
q := NewProxyStore(nil,
nil,
func() []Client { return tc.storeAPIs },
component.Query,
tc.selectorLabels,
5*time.Second, strategy,
)

ctx := context.Background()
if len(tc.storeDebugMatchers) > 0 {
ctx = context.WithValue(ctx, StoreMatcherKey, tc.storeDebugMatchers)
}

if ok := t.Run(tc.title, func(t *testing.T) {
for _, strategy := range []RetrievalStrategy{EagerRetrieval, LazyRetrieval} {
if ok := t.Run(string(strategy), func(t *testing.T) {
q := NewProxyStore(nil,
nil,
func() []Client { return tc.storeAPIs },
component.Query,
tc.selectorLabels,
0*time.Second, strategy,
)

ctx := context.Background()
if len(tc.storeDebugMatchers) > 0 {
ctx = context.WithValue(ctx, StoreMatcherKey, tc.storeDebugMatchers)
}

s := newStoreSeriesServer(ctx)
err := q.Series(tc.req, s)
if tc.expectedErr != nil {
testutil.NotOk(t, err)
testutil.Equals(t, tc.expectedErr.Error(), err.Error())
return
}
testutil.Ok(t, err)

seriesEquals(t, tc.expectedSeries, s.SeriesSet)
testutil.Equals(t, tc.expectedWarningsLen, len(s.Warnings), "got %v", s.Warnings)
}); !ok {
s := newStoreSeriesServer(ctx)
err := q.Series(tc.req, s)
if tc.expectedErr != nil {
testutil.NotOk(t, err)
testutil.Equals(t, tc.expectedErr.Error(), err.Error())
return
}
}
testutil.Ok(t, err)

}); !ok {
return
seriesEquals(t, tc.expectedSeries, s.SeriesSet)
testutil.Equals(t, tc.expectedWarningsLen, len(s.Warnings), "got %v", s.Warnings)

}); !ok {
return
}
}

}
}

Expand Down Expand Up @@ -1421,7 +1417,7 @@ func TestProxyStore_LabelNames(t *testing.T) {
func() []Client { return tc.storeAPIs },
component.Query,
nil,
0*time.Second, EagerRetrieval,
5*time.Second, EagerRetrieval,
)

ctx := context.Background()
Expand Down
5 changes: 3 additions & 2 deletions test/e2e/query_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1557,7 +1557,8 @@ func TestConnectedQueriesWithLazyProxy(t *testing.T) {
testutil.Ok(t, e2e.StartAndWaitReady(prom, sidecar, querier1, querier2))
testutil.Ok(t, querier2.WaitSumMetricsWithOptions(e2e.Equals(1), []string{"thanos_store_nodes_grpc_connections"}, e2e.WaitMissingMetrics()))

instantQuery(t, context.Background(), querier2.Endpoint("http"), func() string {
return "up"
result := instantQuery(t, context.Background(), querier2.Endpoint("http"), func() string {
return "sum(up)"
}, time.Now, promclient.QueryOptions{}, 1)
testutil.Equals(t, model.SampleValue(1.0), result[0].Value)
}

0 comments on commit 003c51e

Please sign in to comment.