Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

compact: Added support for no-compact markers in planner. #3409

Merged
merged 2 commits into from
Nov 6, 2020
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Next Next commit
compact: Added support for no-compact markers in planner.
The planner algo was adapted to avoid unnecessary changes to
blocks caused by excluded blocks, so we can quickly switch to
different planning logic in next iteration.

Fixes: #1424

Signed-off-by: Bartlomiej Plotka <[email protected]>
  • Loading branch information
bwplotka committed Nov 4, 2020
commit 8cf0af5e6dd92a95767b566e364fe4a5e3d12093
4 changes: 3 additions & 1 deletion cmd/thanos/compact.go
Original file line number Diff line number Diff line change
Expand Up @@ -220,6 +220,7 @@ func runCompact(
component,
)
api := blocksAPI.NewBlocksAPI(logger, conf.label, flagsMap)
noCompactMarkerFilter := compact.NewGatherNoCompactionMarkFilter(logger, bkt)
var sy *compact.Syncer
{
// Make sure all compactor meta syncs are done through Syncer.SyncMeta for readability.
Expand All @@ -229,6 +230,7 @@ func runCompact(
block.NewConsistencyDelayMetaFilter(logger, conf.consistencyDelay, extprom.WrapRegistererWithPrefix("thanos_", reg)),
ignoreDeletionMarkFilter,
duplicateBlocksFilter,
noCompactMarkerFilter,
}, []block.MetadataModifier{block.NewReplicaLabelRemover(logger, conf.dedupReplicaLabels)},
)
cf.UpdateOnChange(func(blocks []metadata.Meta, err error) {
Expand Down Expand Up @@ -280,7 +282,7 @@ func runCompact(

grouper := compact.NewDefaultGrouper(logger, bkt, conf.acceptMalformedIndex, enableVerticalCompaction, reg, blocksMarkedForDeletion, garbageCollectedBlocks)
blocksCleaner := compact.NewBlocksCleaner(logger, bkt, ignoreDeletionMarkFilter, deleteDelay, blocksCleaned, blockCleanupFailures)
compactor, err := compact.NewBucketCompactor(logger, sy, grouper, compact.NewTSDBBasedPlanner(logger, levels), comp, compactDir, bkt, conf.compactionConcurrency)
compactor, err := compact.NewBucketCompactor(logger, sy, grouper, compact.NewPlanner(logger, levels, noCompactMarkerFilter), comp, compactDir, bkt, conf.compactionConcurrency)
if err != nil {
cancel()
return errors.Wrap(err, "create bucket compactor")
Expand Down
27 changes: 16 additions & 11 deletions pkg/block/fetcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,9 @@ const (
// but don't have a replacement block yet.
markedForDeletionMeta = "marked-for-deletion"

// MarkedForNoCompactionMeta is label for blocks which are loaded but also marked for no compaction. This label is also counted in `loaded` label metric.
MarkedForNoCompactionMeta = "marked-for-no-compact"

// Modified label values.
replicaRemovedMeta = "replica-label-removed"
)
Expand Down Expand Up @@ -111,6 +114,7 @@ func newFetcherMetrics(reg prometheus.Registerer) *fetcherMetrics {
[]string{timeExcludedMeta},
[]string{duplicateMeta},
[]string{markedForDeletionMeta},
[]string{MarkedForNoCompactionMeta},
)
m.modified = extprom.NewTxGaugeVec(
reg,
Expand Down Expand Up @@ -782,19 +786,20 @@ func (f *IgnoreDeletionMarkFilter) Filter(ctx context.Context, metas map[ulid.UL
f.deletionMarkMap = make(map[ulid.ULID]*metadata.DeletionMark)

for id := range metas {
deletionMark, err := metadata.ReadDeletionMark(ctx, f.bkt, f.logger, id.String())
if err == metadata.ErrorDeletionMarkNotFound {
continue
}
if errors.Cause(err) == metadata.ErrorUnmarshalDeletionMark {
level.Warn(f.logger).Log("msg", "found partial deletion-mark.json; if we will see it happening often for the same block, consider manually deleting deletion-mark.json from the object storage", "block", id, "err", err)
continue
}
if err != nil {
m := &metadata.DeletionMark{}
if err := metadata.ReadMarker(ctx, f.logger, f.bkt, id.String(), m); err != nil {
if errors.Cause(err) == metadata.ErrorMarkerNotFound {
continue
}
if errors.Cause(err) == metadata.ErrorUnmarshalMarker {
level.Warn(f.logger).Log("msg", "found partial deletion-mark.json; if we will see it happening often for the same block, consider manually deleting deletion-mark.json from the object storage", "block", id, "err", err)
continue
}
return err
}
f.deletionMarkMap[id] = deletionMark
if time.Since(time.Unix(deletionMark.DeletionTime, 0)).Seconds() > f.delay.Seconds() {

f.deletionMarkMap[id] = m
if time.Since(time.Unix(m.DeletionTime, 0)).Seconds() > f.delay.Seconds() {
synced.WithLabelValues(markedForDeletionMeta).Inc()
delete(metas, id)
}
Expand Down
76 changes: 0 additions & 76 deletions pkg/block/metadata/deletionmark.go

This file was deleted.

81 changes: 0 additions & 81 deletions pkg/block/metadata/deletionmark_test.go

This file was deleted.

115 changes: 115 additions & 0 deletions pkg/block/metadata/markers.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,115 @@
// Copyright (c) The Thanos Authors.
// Licensed under the Apache License 2.0.

package metadata

import (
"context"
"encoding/json"
"io/ioutil"
"path"

"github.com/go-kit/kit/log"
"github.com/oklog/ulid"
"github.com/pkg/errors"
"github.com/thanos-io/thanos/pkg/objstore"
"github.com/thanos-io/thanos/pkg/runutil"
)

const (
// DeletionMarkFilename is the known json filename for optional file storing details about when block is marked for deletion.
// If such file is present in block dir, it means the block is meant to be deleted after certain delay.
DeletionMarkFilename = "deletion-mark.json"
// NoCompactMarkFilename is the known json filename for optional file storing details about why block has to be excluded from compaction.
// If such file is present in block dir, it means the block has to excluded from compaction (both vertical and horizontal) or rewrite (e.g deletions).
NoCompactMarkFilename = "no-compact-mark.json"

// DeletionMarkVersion1 is the version of deletion-mark file supported by Thanos.
DeletionMarkVersion1 = 1
// NoCompactMarkVersion1 is the version of no-compact-mark file supported by Thanos.
NoCompactMarkVersion1 = 1
)

var (
// ErrorMarkerNotFound is the error when marker file is not found.
ErrorMarkerNotFound = errors.New("marker not found")
// ErrorUnmarshalMarker is the error when unmarshalling marker JSON file.
// This error can occur because marker has been partially uploaded to block storage
// or the marker file is not a valid json file.
ErrorUnmarshalMarker = errors.New("unmarshal marker JSON")
)

type Marker interface {
markerFilename() string
}

// DeletionMark stores block id and when block was marked for deletion.
type DeletionMark struct {
// ID of the tsdb block.
ID ulid.ULID `json:"id"`
// Version of the file.
Version int `json:"version"`

// DeletionTime is a unix timestamp of when the block was marked to be deleted.
DeletionTime int64 `json:"deletion_time"`
}

func (m *DeletionMark) markerFilename() string { return DeletionMarkFilename }

// NoCompactReason is a reason for a block to be excluded from compaction.
type NoCompactReason string

const (
// ManualNoCompactReason is a custom reason of excluding from compaction that should be added when no-compact mark is added for unknown/user specified reason.
ManualNoCompactReason NoCompactReason = "manual"
// IndexSizeExceedingNoCompactReason is a reason of index being too big (for example exceeding 64GB limit: https://github.com/thanos-io/thanos/issues/1424)
// This reason can be ignored when vertical block sharding will be implemented.
IndexSizeExceedingNoCompactReason = "index-size-exceeding"
)

// NoCompactMark marker stores reason of block being excluded from compaction if needed.
type NoCompactMark struct {
// ID of the tsdb block.
ID ulid.ULID `json:"id"`
// Version of the file.
Version int `json:"version"`

Reason NoCompactReason `json:"reason"`
// Details is a human readable string giving details of reason.
Details string `json:"details"`
}

func (n *NoCompactMark) markerFilename() string { return NoCompactMarkFilename }

// ReadMarker reads the given mark file from <dir>/<marker filename>.json in bucket.
func ReadMarker(ctx context.Context, logger log.Logger, bkt objstore.InstrumentedBucketReader, dir string, marker Marker) error {
markerFile := path.Join(dir, marker.markerFilename())
r, err := bkt.ReaderWithExpectedErrs(bkt.IsObjNotFoundErr).Get(ctx, markerFile)
if err != nil {
if bkt.IsObjNotFoundErr(err) {
return ErrorMarkerNotFound
}
return errors.Wrapf(err, "get file: %s", markerFile)
}
defer runutil.CloseWithLogOnErr(logger, r, "close bkt marker reader")

metaContent, err := ioutil.ReadAll(r)
if err != nil {
return errors.Wrapf(err, "read file: %s", markerFile)
}

if err := json.Unmarshal(metaContent, marker); err != nil {
return errors.Wrapf(ErrorUnmarshalMarker, "file: %s; err: %v", markerFile, err.Error())
}
switch marker.markerFilename() {
case NoCompactMarkFilename:
if version := marker.(*NoCompactMark).Version; version != NoCompactMarkVersion1 {
return errors.Errorf("unexpected no-compact-mark file version %d, expected %d", version, NoCompactMarkVersion1)
}
case DeletionMarkFilename:
if version := marker.(*DeletionMark).Version; version != DeletionMarkVersion1 {
return errors.Errorf("unexpected deletion-mark file version %d, expected %d", version, DeletionMarkVersion1)
}
}
return nil
}
Loading