Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

store: use k-way merging for proxying logic #5296

Merged
merged 28 commits into from
Sep 9, 2022
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
Show all changes
28 commits
Select commit Hold shift + click to select a range
0fb4a18
store: use k-way merging for proxying logic
GiedriusS Apr 24, 2022
8fc05fe
Merge remote-tracking branch 'origin/main' into kway_merge_heap
GiedriusS Apr 24, 2022
0eda550
store: reuse hash + use md5
GiedriusS May 3, 2022
666ed17
store: add full streaming to proxying logic
GiedriusS May 10, 2022
244f2fc
Merge remote-tracking branch 'origin/main' into kway_merge_heap
GiedriusS May 10, 2022
fc4cd7a
proxy: make strategy tunable
GiedriusS May 30, 2022
c3532dd
proxy: fix off-by-1
GiedriusS May 31, 2022
de33878
store: use timers for canceling streaming RPC
GiedriusS Jul 21, 2022
0c2862e
store: use xxhash for hashing
GiedriusS Jul 21, 2022
4614457
Merge remote-tracking branch 'origin/main' into kway_merge_heap
GiedriusS Aug 4, 2022
d9dd9be
store: add sharding support to new code
GiedriusS Aug 4, 2022
2b31e0b
store: implement timeout for lazy proxy
GiedriusS Aug 4, 2022
2ad6080
store: uncomment err tests
GiedriusS Aug 4, 2022
abc57a7
store: bring back statistics
GiedriusS Aug 4, 2022
14884da
store: add some unit tests
GiedriusS Aug 5, 2022
3501342
CHANGELOG: update
GiedriusS Aug 5, 2022
e29ceaa
Merge remote-tracking branch 'origin/main' into kway_merge_heap
GiedriusS Sep 1, 2022
2a09329
*: fix according to comments
GiedriusS Sep 1, 2022
7dbd67e
store: remove Error() method from heap
GiedriusS Sep 1, 2022
6a4a3b3
e2e: try to repro segfault
GiedriusS Sep 1, 2022
a002a7e
store: no storeAPIs matched is always a warning
GiedriusS Sep 1, 2022
fe06cf1
e2e: fix test after recent changes
GiedriusS Sep 1, 2022
07f680f
query: remove unused constants
GiedriusS Sep 1, 2022
9ce4e0f
store: wire up proxy strategy
GiedriusS Sep 1, 2022
c0d3f46
e2e: add repro for Filip's case
GiedriusS Sep 2, 2022
003c51e
store: fix lazyRespSet At()
GiedriusS Sep 8, 2022
2d28e05
Merge remote-tracking branch 'origin/main' into kway_merge_heap
GiedriusS Sep 8, 2022
caad5c3
storepb: remove unused method
GiedriusS Sep 8, 2022
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
store: add full streaming to proxying logic
Signed-off-by: Giedrius Statkevičius <[email protected]>
  • Loading branch information
GiedriusS committed May 10, 2022
commit 666ed1758adbea3d6cf9cde70dc076f9f25e2243
227 changes: 53 additions & 174 deletions pkg/store/proxy.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,6 @@ package store
import (
"context"
"fmt"
"io"
"math"
"strings"
"sync"
Expand All @@ -24,14 +23,10 @@ import (
"google.golang.org/grpc/status"

"github.com/thanos-io/thanos/pkg/component"
"github.com/thanos-io/thanos/pkg/errutil"
"github.com/thanos-io/thanos/pkg/store/labelpb"
"github.com/thanos-io/thanos/pkg/store/storepb"
"github.com/thanos-io/thanos/pkg/strutil"
"github.com/thanos-io/thanos/pkg/tracing"

grpc_opentracing "github.com/grpc-ecosystem/go-grpc-middleware/v2/interceptors/tracing"
"github.com/opentracing/opentracing-go"
)

type ctxKey int
Expand Down Expand Up @@ -238,27 +233,20 @@ func (s cancelableRespSender) send(r *storepb.SeriesResponse) {
}
}

type recvResponse struct {
r *storepb.SeriesResponse
err error
}

func frameCtx(responseTimeout time.Duration) (context.Context, context.CancelFunc) {
frameTimeoutCtx := context.Background()
var cancel context.CancelFunc
func frameCtx(ctx context.Context, responseTimeout time.Duration) (context.Context, context.CancelFunc) {
frameTimeoutCtx := ctx
if responseTimeout != 0 {
frameTimeoutCtx, cancel = context.WithTimeout(frameTimeoutCtx, responseTimeout)
return frameTimeoutCtx, cancel
return context.WithTimeout(frameTimeoutCtx, responseTimeout)
}
return frameTimeoutCtx, func() {}
}

func (s *ProxyStore) Series(r *storepb.SeriesRequest, srv storepb.Store_SeriesServer) error {
func (s *ProxyStore) Series(originalRequest *storepb.SeriesRequest, srv storepb.Store_SeriesServer) error {
// TODO(bwplotka): This should be part of request logger, otherwise it does not make much sense. Also, could be
// tiggered by tracing span to reduce cognitive load.
reqLogger := log.With(s.logger, "component", "proxy", "request", r.String())
reqLogger := log.With(s.logger, "component", "proxy", "request", originalRequest.String())

match, matchers, err := matchesExternalLabels(r.Matchers, s.selectorLabels)
match, matchers, err := matchesExternalLabels(originalRequest.Matchers, s.selectorLabels)
if err != nil {
return status.Error(codes.InvalidArgument, err.Error())
}
Expand All @@ -270,183 +258,43 @@ func (s *ProxyStore) Series(r *storepb.SeriesRequest, srv storepb.Store_SeriesSe
}
storeMatchers, _ := storepb.PromMatchersToMatchers(matchers...) // Error would be returned by matchesExternalLabels, so skip check.

wg := &sync.WaitGroup{}

storeDebugMsgs := []string{}
actualRequest := &storepb.SeriesRequest{
MinTime: r.MinTime,
MaxTime: r.MaxTime,
r := &storepb.SeriesRequest{
MinTime: originalRequest.MinTime,
MaxTime: originalRequest.MaxTime,
Matchers: storeMatchers,
Aggregates: r.Aggregates,
MaxResolutionWindow: r.MaxResolutionWindow,
SkipChunks: r.SkipChunks,
QueryHints: r.QueryHints,
PartialResponseDisabled: r.PartialResponseDisabled,
PartialResponseStrategy: r.PartialResponseStrategy,
Aggregates: originalRequest.Aggregates,
MaxResolutionWindow: originalRequest.MaxResolutionWindow,
SkipChunks: originalRequest.SkipChunks,
QueryHints: originalRequest.QueryHints,
PartialResponseDisabled: originalRequest.PartialResponseDisabled,
PartialResponseStrategy: originalRequest.PartialResponseStrategy,
}

stores := []Client{}
for _, st := range s.stores() {
GiedriusS marked this conversation as resolved.
Show resolved Hide resolved
// We might be able to skip the store if its meta information indicates it cannot have series matching our query.
if ok, reason := storeMatches(srv.Context(), st, r.MinTime, r.MaxTime, matchers...); !ok {
if ok, reason := storeMatches(srv.Context(), st, originalRequest.MinTime, originalRequest.MaxTime, matchers...); !ok {
storeDebugMsgs = append(storeDebugMsgs, fmt.Sprintf("store %s filtered out: %v", st, reason))
continue
}

stores = append(stores, st)
}

wg.Add(len(stores))

var (
errs = errutil.MultiError{}
errMtx sync.Mutex
storeResponses = make([]struct {
responses []*storepb.SeriesResponse
}, len(stores))
storeResponses = make([]*lazyRespSet, len(stores))
)

for i, st := range stores {
storeDebugMsgs = append(storeDebugMsgs, fmt.Sprintf("Store %s queried", st))
GiedriusS marked this conversation as resolved.
Show resolved Hide resolved

go func(i int, st Client) {
defer wg.Done()

storeID := labelpb.PromLabelSetsToString(st.LabelSets())
if storeID == "" {
storeID = "Store Gateway"
}

seriesCtx, closeSeries := context.WithCancel(srv.Context())
seriesCtx = grpc_opentracing.ClientAddContextTags(seriesCtx, opentracing.Tags{
"target": st.Addr(),
})
defer closeSeries()

span, seriesCtx := tracing.StartSpan(seriesCtx, "proxy.series", tracing.Tags{
"store.id": storeID,
"store.addr": st.Addr(),
})
cl, err := st.Series(seriesCtx, actualRequest)
if err != nil {
err = errors.Wrapf(err, "fetch series for %s %s", storeID, st)
errMtx.Lock()
errs.Add(err)
errMtx.Unlock()

span.SetTag("err", err.Error())
span.Finish()
return
}

rCh := make(chan *recvResponse)
done := make(chan struct{})
go func() {
for {
// TODO: we can buffer this in files to have infinitely
// scalable read path.
// TODO: this sorely needs some limit on the number of series
// per each request.
r, err := cl.Recv()
select {
case <-done:
close(rCh)
return
case rCh <- &recvResponse{r: r, err: err}:
}
}
}()

// The `defer` only executed when function return, we do `defer cancel` in for loop,
// so make the loop body as a function, release timers created by context as early.
handleRecvResponse := func() (next bool) {
frameTimeoutCtx, cancel := frameCtx(s.responseTimeout)
defer cancel()
var rr *recvResponse
select {
case <-seriesCtx.Done():
err := errors.Wrapf(seriesCtx.Err(), "failed to receive any data from %s", st.String())

errMtx.Lock()
errs.Add(err)
errMtx.Unlock()

span.SetTag("err", err.Error())
span.Finish()
close(done)
return false
case <-frameTimeoutCtx.Done():
err := errors.Wrapf(frameTimeoutCtx.Err(), "failed to receive any data in %v from %s", s.responseTimeout, st.String())

errMtx.Lock()
errs.Add(err)
errMtx.Unlock()

span.SetTag("err", err.Error())
span.Finish()
close(done)
return false
case rr = <-rCh:
}

if rr.err == io.EOF {
span.Finish()
close(done)
return false
}

if rr.err != nil {
err := errors.Wrapf(rr.err, "receive series from %s", st.String())
errMtx.Lock()
errs.Add(err)
errMtx.Unlock()

span.SetTag("err", err.Error())
span.Finish()
close(done)
return false
}

storeResponses[i].responses = append(storeResponses[i].responses, rr.r)
return true
}

for {
if !handleRecvResponse() {
return
}
}
}(i, st)
}

level.Debug(reqLogger).Log("msg", "Series: started fanout streams", "status", strings.Join(storeDebugMsgs, ";"))

wg.Wait()

if (actualRequest.PartialResponseStrategy == storepb.PartialResponseStrategy_ABORT || actualRequest.PartialResponseDisabled) && errs.Err() != nil {
level.Error(reqLogger).Log("err", errs.Err(), "msg", "partial response disabled; aborting request")

return errs.Err()
} else if errs.Err() != nil && (actualRequest.PartialResponseStrategy == storepb.PartialResponseStrategy_WARN || !actualRequest.PartialResponseDisabled) {
for _, rerr := range errs {
if err := srv.Send(storepb.NewWarnSeriesResponse(rerr)); err != nil {
level.Error(reqLogger).Log("err", err)

return status.Error(codes.Unknown, errors.Wrap(err, "send warning response").Error())
}
}
}

seriesSets := []*respSet{}
for _, resp := range storeResponses {
if len(resp.responses) == 0 {
s.metrics.emptyStreamResponses.Inc()
continue
}
seriesSets = append(seriesSets, &respSet{responses: resp.responses})
st := st
storeResponses[i] = newLazyRespSet(srv.Context(), st, r, s.responseTimeout)
defer storeResponses[i].Close()
}

if len(seriesSets) == 0 {
if len(stores) == 0 {
GiedriusS marked this conversation as resolved.
Show resolved Hide resolved
err := errors.New("No StoreAPIs matched for this query")
level.Warn(reqLogger).Log("err", err, "stores", strings.Join(storeDebugMsgs, ";"))
if sendErr := srv.Send(storepb.NewWarnSeriesResponse(err)); sendErr != nil {
Expand All @@ -458,7 +306,9 @@ func (s *ProxyStore) Series(r *storepb.SeriesRequest, srv storepb.Store_SeriesSe
return nil
}

respHeap := NewDedupResponseHeap(NewProxyResponseHeap(seriesSets...))
level.Debug(reqLogger).Log("msg", "Series: started fanout streams", "status", strings.Join(storeDebugMsgs, ";"))

respHeap := NewDedupResponseHeap(NewProxyResponseHeap(storeResponses...))

for respHeap.Next() {
resp := respHeap.At()
Expand All @@ -468,6 +318,35 @@ func (s *ProxyStore) Series(r *storepb.SeriesRequest, srv storepb.Store_SeriesSe
}
}

for _, respSet := range storeResponses {
if respSet.Err() == nil {
continue
}

if !r.PartialResponseDisabled || r.PartialResponseStrategy == storepb.PartialResponseStrategy_WARN {
if err := srv.Send(storepb.NewWarnSeriesResponse(respSet.Err())); err != nil {
return errors.Wrap(err, "send series response")
}
} else {
storeID := labelpb.PromLabelSetsToString(respSet.st.LabelSets())
if storeID == "" {
storeID = "Store Gateway"
}

return errors.Wrapf(respSet.Err(), "fetch series for %s %s", storeID, respSet.st)
}
}

if respHeap.Err() != nil {
if !r.PartialResponseDisabled || r.PartialResponseStrategy == storepb.PartialResponseStrategy_WARN {
if err := srv.Send(storepb.NewWarnSeriesResponse(respHeap.Err())); err != nil {
return errors.Wrap(err, "send series response")
}
} else {
return status.Error(codes.Unknown, respHeap.Err().Error())
}
}

return nil
GiedriusS marked this conversation as resolved.
Show resolved Hide resolved
}

Expand Down
Loading