Skip to content

Commit

Permalink
address review comments
Browse files Browse the repository at this point in the history
Signed-off-by: yeya24 <[email protected]>
  • Loading branch information
yeya24 committed May 17, 2021
1 parent e4c4f82 commit f85a231
Show file tree
Hide file tree
Showing 9 changed files with 897 additions and 873 deletions.
10 changes: 8 additions & 2 deletions pkg/block/fetcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -697,11 +697,12 @@ type ReplicaLabelRemover struct {
logger log.Logger

replicaLabels []string
dedupEnabled bool
}

// NewReplicaLabelRemover creates a ReplicaLabelRemover.
func NewReplicaLabelRemover(logger log.Logger, replicaLabels []string) *ReplicaLabelRemover {
return &ReplicaLabelRemover{logger: logger, replicaLabels: replicaLabels}
func NewReplicaLabelRemover(logger log.Logger, replicaLabels []string, dedupEnabled bool) *ReplicaLabelRemover {
return &ReplicaLabelRemover{logger: logger, replicaLabels: replicaLabels, dedupEnabled: dedupEnabled}
}

// Modify modifies external labels of existing blocks, it removes given replica labels from the metadata of blocks that have it.
Expand All @@ -711,6 +712,11 @@ func (r *ReplicaLabelRemover) Modify(_ context.Context, metas map[ulid.ULID]*met
}

for u, meta := range metas {
// Skip downsampled blocks for now if penalty based deduplication is enabled.
// TODO: remove this after downsampled blocks are supported.
if r.dedupEnabled && meta.Thanos.Downsample.Resolution != int64(0) {
continue
}
l := meta.Thanos.Labels
for _, replicaLabel := range r.replicaLabels {
if _, exists := l[replicaLabel]; exists {
Expand Down
6 changes: 3 additions & 3 deletions pkg/block/fetcher_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -899,7 +899,7 @@ func TestReplicaLabelRemover_Modify(t *testing.T) {
ULID(3): {Thanos: metadata.Thanos{Labels: map[string]string{"message": "something1"}}},
},
modified: 0,
replicaLabelRemover: NewReplicaLabelRemover(log.NewNopLogger(), []string{"replica", "rule_replica"}),
replicaLabelRemover: NewReplicaLabelRemover(log.NewNopLogger(), []string{"replica", "rule_replica"}, false),
},
{
name: "with replica labels",
Expand All @@ -916,7 +916,7 @@ func TestReplicaLabelRemover_Modify(t *testing.T) {
ULID(4): {Thanos: metadata.Thanos{Labels: map[string]string{"replica": "deduped"}}},
},
modified: 5.0,
replicaLabelRemover: NewReplicaLabelRemover(log.NewNopLogger(), []string{"replica", "rule_replica"}),
replicaLabelRemover: NewReplicaLabelRemover(log.NewNopLogger(), []string{"replica", "rule_replica"}, false),
},
{
name: "no replica label specified in the ReplicaLabelRemover",
Expand All @@ -931,7 +931,7 @@ func TestReplicaLabelRemover_Modify(t *testing.T) {
ULID(3): {Thanos: metadata.Thanos{Labels: map[string]string{"message": "something1"}}},
},
modified: 0,
replicaLabelRemover: NewReplicaLabelRemover(log.NewNopLogger(), []string{}),
replicaLabelRemover: NewReplicaLabelRemover(log.NewNopLogger(), []string{}, false),
},
} {
m := newTestFetcherMetrics()
Expand Down
9 changes: 9 additions & 0 deletions pkg/compact/compact.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,15 @@ const (
ResolutionLevel1h = ResolutionLevel(downsample.ResLevel2)
)

const (
// DedupAlgorithmCompact is the original compactor series merge algorithm with 1:1 deduplication.
DedupAlgorithmCompact = "compact"

// DedupAlgorithmPenalty is the penalty based compactor series merge algorithm.
// This is the same as the online deduplication of querier except counter reset handling.
DedupAlgorithmPenalty = "penalty"
)

// Syncer synchronizes block metas from a bucket into a local directory.
// It sorts them into compaction groups based on equal label sets.
type Syncer struct {
Expand Down
122 changes: 4 additions & 118 deletions pkg/compact/dedup.go → pkg/dedup/chunk_iter.go
Original file line number Diff line number Diff line change
@@ -1,7 +1,4 @@
// Copyright (c) The Thanos Authors.
// Licensed under the Apache License 2.0.

package compact
package dedup

import (
"container/heap"
Expand Down Expand Up @@ -131,10 +128,11 @@ func (d *dedupChunksIterator) Next() bool {
iter = (&seriesToChunkEncoder{Series: &storage.SeriesEntry{
Lset: nil,
SampleIteratorFn: func() chunkenc.Iterator {
it := newChunkToSeriesDecoder(nil, d.curr).Iterator()
var it adjustableSeriesIterator
it = noopAdjustableSeriesIterator{newChunkToSeriesDecoder(nil, d.curr).Iterator()}

for _, o := range overlapping {
it = newDedupSamplesIterator(it, o.Iterator())
it = newDedupSeriesIterator(it, noopAdjustableSeriesIterator{o.Iterator()})
}
return it
},
Expand All @@ -156,118 +154,6 @@ func (d *dedupChunksIterator) Err() error {
return d.err
}

type dedupSamplesIterator struct {
a, b chunkenc.Iterator

aok, bok bool

// TODO(bwplotka): Don't base on LastT, but on detected scrape interval. This will allow us to be more
// responsive to gaps: https://github.com/thanos-io/thanos/issues/981, let's do it in next PR.
lastT int64

penA, penB int64
useA bool
}

func newDedupSamplesIterator(a, b chunkenc.Iterator) *dedupSamplesIterator {
return &dedupSamplesIterator{
a: a,
b: b,
lastT: math.MinInt64,
aok: a.Next(),
bok: b.Next(),
}
}

func (it *dedupSamplesIterator) Next() bool {
// Advance both iterators to at least the next highest timestamp plus the potential penalty.
if it.aok {
it.aok = it.a.Seek(it.lastT + 1 + it.penA)
}
if it.bok {
it.bok = it.b.Seek(it.lastT + 1 + it.penB)
}

// Handle basic cases where one iterator is exhausted before the other.
if !it.aok {
it.useA = false
if it.bok {
it.lastT, _ = it.b.At()
it.penB = 0
}
return it.bok
}
if !it.bok {
it.useA = true
it.lastT, _ = it.a.At()
it.penA = 0
return true
}
// General case where both iterators still have data. We pick the one
// with the smaller timestamp.
// The applied penalty potentially already skipped potential samples already
// that would have resulted in exaggerated sampling frequency.
ta, _ := it.a.At()
tb, _ := it.b.At()

it.useA = ta <= tb

// For the series we didn't pick, add a penalty twice as high as the delta of the last two
// samples to the next seek against it.
// This ensures that we don't pick a sample too close, which would increase the overall
// sample frequency. It also guards against clock drift and inaccuracies during
// timestamp assignment.
// If we don't know a delta yet, we pick 5000 as a constant, which is based on the knowledge
// that timestamps are in milliseconds and sampling frequencies typically multiple seconds long.
const initialPenalty = 5000

if it.useA {
if it.lastT != math.MinInt64 {
it.penB = 2 * (ta - it.lastT)
} else {
it.penB = initialPenalty
}
it.penA = 0
it.lastT = ta
return true
}
if it.lastT != math.MinInt64 {
it.penA = 2 * (tb - it.lastT)
} else {
it.penA = initialPenalty
}
it.penB = 0
it.lastT = tb
return true
}

func (it *dedupSamplesIterator) Seek(t int64) bool {
// Don't use underlying Seek, but iterate over next to not miss gaps.
for {
ts, _ := it.At()
if ts >= t {
return true
}
if !it.Next() {
return false
}
}
}

func (it *dedupSamplesIterator) At() (int64, float64) {
if it.useA {
return it.a.At()
}
return it.b.At()
}

func (it *dedupSamplesIterator) Err() error {
if it.a.Err() != nil {
return it.a.Err()
}
return it.b.Err()
}

type seriesToChunkEncoder struct {
storage.Series
}
Expand Down
Loading

0 comments on commit f85a231

Please sign in to comment.