Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

store: use k-way merging for proxying logic #5296

Merged
merged 28 commits into from
Sep 9, 2022
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
Show all changes
28 commits
Select commit Hold shift + click to select a range
0fb4a18
store: use k-way merging for proxying logic
GiedriusS Apr 24, 2022
8fc05fe
Merge remote-tracking branch 'origin/main' into kway_merge_heap
GiedriusS Apr 24, 2022
0eda550
store: reuse hash + use md5
GiedriusS May 3, 2022
666ed17
store: add full streaming to proxying logic
GiedriusS May 10, 2022
244f2fc
Merge remote-tracking branch 'origin/main' into kway_merge_heap
GiedriusS May 10, 2022
fc4cd7a
proxy: make strategy tunable
GiedriusS May 30, 2022
c3532dd
proxy: fix off-by-1
GiedriusS May 31, 2022
de33878
store: use timers for canceling streaming RPC
GiedriusS Jul 21, 2022
0c2862e
store: use xxhash for hashing
GiedriusS Jul 21, 2022
4614457
Merge remote-tracking branch 'origin/main' into kway_merge_heap
GiedriusS Aug 4, 2022
d9dd9be
store: add sharding support to new code
GiedriusS Aug 4, 2022
2b31e0b
store: implement timeout for lazy proxy
GiedriusS Aug 4, 2022
2ad6080
store: uncomment err tests
GiedriusS Aug 4, 2022
abc57a7
store: bring back statistics
GiedriusS Aug 4, 2022
14884da
store: add some unit tests
GiedriusS Aug 5, 2022
3501342
CHANGELOG: update
GiedriusS Aug 5, 2022
e29ceaa
Merge remote-tracking branch 'origin/main' into kway_merge_heap
GiedriusS Sep 1, 2022
2a09329
*: fix according to comments
GiedriusS Sep 1, 2022
7dbd67e
store: remove Error() method from heap
GiedriusS Sep 1, 2022
6a4a3b3
e2e: try to repro segfault
GiedriusS Sep 1, 2022
a002a7e
store: no storeAPIs matched is always a warning
GiedriusS Sep 1, 2022
fe06cf1
e2e: fix test after recent changes
GiedriusS Sep 1, 2022
07f680f
query: remove unused constants
GiedriusS Sep 1, 2022
9ce4e0f
store: wire up proxy strategy
GiedriusS Sep 1, 2022
c0d3f46
e2e: add repro for Filip's case
GiedriusS Sep 2, 2022
003c51e
store: fix lazyRespSet At()
GiedriusS Sep 8, 2022
2d28e05
Merge remote-tracking branch 'origin/main' into kway_merge_heap
GiedriusS Sep 8, 2022
caad5c3
storepb: remove unused method
GiedriusS Sep 8, 2022
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
store: add sharding support to new code
Signed-off-by: Giedrius Statkevičius <[email protected]>
  • Loading branch information
GiedriusS committed Aug 4, 2022
commit d9dd9be8c739a7dfa988db9d62968a11c7c1beb2
3 changes: 2 additions & 1 deletion pkg/store/proxy.go
Original file line number Diff line number Diff line change
Expand Up @@ -277,6 +277,7 @@ func (s *ProxyStore) Series(originalRequest *storepb.SeriesRequest, srv storepb.
QueryHints: originalRequest.QueryHints,
PartialResponseDisabled: originalRequest.PartialResponseDisabled,
PartialResponseStrategy: originalRequest.PartialResponseStrategy,
ShardInfo: originalRequest.ShardInfo,
}

stores := []Client{}
Expand All @@ -297,7 +298,7 @@ func (s *ProxyStore) Series(originalRequest *storepb.SeriesRequest, srv storepb.

storeDebugMsgs = append(storeDebugMsgs, fmt.Sprintf("Store %s queried", st))
GiedriusS marked this conversation as resolved.
Show resolved Hide resolved

respSet, err := newAsyncRespSet(srv.Context(), st, r, s.responseTimeout, EagerRetrieval)
respSet, err := newAsyncRespSet(srv.Context(), st, r, s.responseTimeout, LazyRetrieval, st.SupportsSharding(), &s.buffers, r.ShardInfo, reqLogger)
if err != nil {
level.Error(reqLogger).Log("err", err)

Expand Down
39 changes: 38 additions & 1 deletion pkg/store/proxy_heap.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,10 @@ import (
"sync"
"time"

"github.com/go-kit/log"

"github.com/cespare/xxhash/v2"
"github.com/go-kit/log/level"
grpc_opentracing "github.com/grpc-ecosystem/go-grpc-middleware/v2/interceptors/tracing"
"github.com/opentracing/opentracing-go"
"github.com/pkg/errors"
Expand Down Expand Up @@ -293,6 +296,8 @@ type lazyRespSet struct {
err error
errMtx sync.Mutex
noMoreData bool

shardMatcher *storepb.ShardMatcher
}

func (l *lazyRespSet) Err() error {
Expand Down Expand Up @@ -342,6 +347,8 @@ func newLazyRespSet(
st Client,
closeSeries context.CancelFunc,
cl storepb.Store_SeriesClient,
shardMatcher *storepb.ShardMatcher,
applySharding bool,
) respSet {
bufferedResponses := []*storepb.SeriesResponse{}
bufferedResponsesMtx := &sync.Mutex{}
Expand All @@ -357,6 +364,7 @@ func newLazyRespSet(
dataOrFinishEvent: dataAvailable,
bufferedResponsesMtx: bufferedResponsesMtx,
bufferedResponses: bufferedResponses,
shardMatcher: shardMatcher,
}

go func(st Client, l *lazyRespSet) {
Expand Down Expand Up @@ -419,6 +427,10 @@ func newLazyRespSet(
return false
}

if resp.GetSeries() != nil && applySharding && !shardMatcher.MatchesZLabels(resp.GetSeries().Labels) {
return true
}

l.bufferedResponsesMtx.Lock()
l.bufferedResponses = append(l.bufferedResponses, resp)
l.dataOrFinishEvent.Signal()
Expand Down Expand Up @@ -449,7 +461,7 @@ const (
EagerRetrieval RetrievalStrategy = "eager"
)

func newAsyncRespSet(ctx context.Context, st Client, req *storepb.SeriesRequest, frameTimeout time.Duration, retrievalStrategy RetrievalStrategy) (respSet, error) {
func newAsyncRespSet(ctx context.Context, st Client, req *storepb.SeriesRequest, frameTimeout time.Duration, retrievalStrategy RetrievalStrategy, storeSupportsSharding bool, buffers *sync.Pool, shardInfo *storepb.ShardInfo, logger log.Logger) (respSet, error) {
var span opentracing.Span
var closeSeries context.CancelFunc

Expand All @@ -469,6 +481,15 @@ func newAsyncRespSet(ctx context.Context, st Client, req *storepb.SeriesRequest,

seriesCtx, closeSeries = context.WithCancel(seriesCtx)

shardMatcher := shardInfo.Matcher(buffers)
defer shardMatcher.Close()

applySharding := shardInfo != nil && !storeSupportsSharding
if applySharding {
msg := "Applying series sharding in the proxy since there is not support in the underlying store"
level.Debug(logger).Log("msg", msg, "store", st.String())
}

cl, err := st.Series(seriesCtx, req)
if err != nil {
err = errors.Wrapf(err, "fetch series for %s %s", storeID, st)
Expand All @@ -488,6 +509,8 @@ func newAsyncRespSet(ctx context.Context, st Client, req *storepb.SeriesRequest,
st,
closeSeries,
cl,
shardMatcher,
applySharding,
), nil
case EagerRetrieval:
return newEagerRespSet(
Expand All @@ -497,6 +520,8 @@ func newAsyncRespSet(ctx context.Context, st Client, req *storepb.SeriesRequest,
st,
closeSeries,
cl,
shardMatcher,
applySharding,
), nil
default:
panic(fmt.Sprintf("unsupported retrieval strategy %s", retrievalStrategy))
Expand All @@ -510,6 +535,8 @@ func (l *lazyRespSet) Close() {
l.closeSeries()
l.noMoreData = true
l.dataOrFinishEvent.Signal()

l.shardMatcher.Close()
}

// eagerRespSet is a SeriesSet that blocks until all data is retrieved from
Expand All @@ -524,6 +551,8 @@ type eagerRespSet struct {
st Client
frameTimeout time.Duration

shardMatcher *storepb.ShardMatcher

// Internal bookkeeping.
err error
GiedriusS marked this conversation as resolved.
Show resolved Hide resolved
GiedriusS marked this conversation as resolved.
Show resolved Hide resolved
bufferedResponses []*storepb.SeriesResponse
Expand All @@ -538,6 +567,8 @@ func newEagerRespSet(
st Client,
closeSeries context.CancelFunc,
cl storepb.Store_SeriesClient,
shardMatcher *storepb.ShardMatcher,
applySharding bool,
) respSet {
ret := &eagerRespSet{
span: span,
Expand All @@ -548,6 +579,7 @@ func newEagerRespSet(
ctx: ctx,
bufferedResponses: []*storepb.SeriesResponse{},
wg: &sync.WaitGroup{},
shardMatcher: shardMatcher,
}

ret.wg.Add(1)
Expand Down Expand Up @@ -584,6 +616,10 @@ func newEagerRespSet(
return false
}

if resp.GetSeries() != nil && applySharding && !shardMatcher.MatchesZLabels(resp.GetSeries().Labels) {
return true
}

l.bufferedResponses = append(l.bufferedResponses, resp)
return true
}
Expand All @@ -605,6 +641,7 @@ func newEagerRespSet(
}

func (l *eagerRespSet) Close() {
l.shardMatcher.Close()
}

func (l *eagerRespSet) At() *storepb.SeriesResponse {
Expand Down
6 changes: 3 additions & 3 deletions pkg/store/proxy_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1101,7 +1101,7 @@ func TestProxyStore_Series_RequestParamsProxied(t *testing.T) {
func() []Client { return cls },
component.Query,
nil,
0*time.Second,
1*time.Second,
)

ctx := context.Background()
Expand All @@ -1111,12 +1111,12 @@ func TestProxyStore_Series_RequestParamsProxied(t *testing.T) {
MinTime: 1,
MaxTime: 300,
Matchers: []storepb.LabelMatcher{{Name: "ext", Value: "1", Type: storepb.LabelMatcher_EQ}},
PartialResponseDisabled: true,
PartialResponseDisabled: false,
Aggregates: []storepb.Aggr{
storepb.Aggr_COUNTER,
storepb.Aggr_COUNT,
},
PartialResponseStrategy: storepb.PartialResponseStrategy_ABORT,
PartialResponseStrategy: storepb.PartialResponseStrategy_WARN,
MaxResolutionWindow: 1234,
}
testutil.Ok(t, q.Series(req, s))
Expand Down