Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Refactor replicate execution method to not iterate the origin bucket #2906

Merged
merged 3 commits into from
Oct 30, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,12 @@ We use *breaking :warning:* to mark changes that are not backward compatible (re
- [#3331](https://github.com/thanos-io/thanos/pull/3331) Disable Azure blob exception logging
- [#3341](https://github.com/thanos-io/thanos/pull/3341) Disable Azure blob syslog exception logging

### Changed

- [#2906](https://github.com/thanos-io/thanos/pull/2906) Tools: Refactor Bucket replicate execution. Removed all `thanos_replicate_origin_.*` metrics.
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Technically not need to mention refactor, just metric changes (:

- `thanos_replicate_origin_meta_loads_total` can be replaced by `blocks_meta_synced{state="loaded"}`.
- `thanos_replicate_origin_partial_meta_reads_total` can be replaced by `blocks_meta_synced{state="failed"}`.

## [v0.16.0](https://github.com/thanos-io/thanos/releases) - 2020.10.26

Highlights:
Expand Down
12 changes: 2 additions & 10 deletions examples/dashboards/bucket-replicate.json
Original file line number Diff line number Diff line change
Expand Up @@ -305,23 +305,15 @@
"steppedLine": false,
"targets": [
{
"expr": "sum(rate(thanos_replicate_origin_iterations_total{namespace=\"$namespace\",job=~\"thanos-bucket-replicate.*\"}[$interval]))",
"format": "time_series",
"intervalFactor": 2,
"legendFormat": "iterations",
"legendLink": null,
"step": 10
},
{
"expr": "sum(rate(thanos_replicate_origin_meta_loads_total{namespace=\"$namespace\",job=~\"thanos-bucket-replicate.*\"}[$interval]))",
"expr": "sum(rate(blocks_meta_synced{state=\"loaded\",namespace=\"$namespace\",job=~\"thanos-bucket-replicate.*\"}[$interval]))",
"format": "time_series",
"intervalFactor": 2,
"legendFormat": "meta loads",
"legendLink": null,
"step": 10
},
{
"expr": "sum(rate(thanos_replicate_origin_partial_meta_reads_total{namespace=\"$namespace\",job=~\"thanos-bucket-replicate.*\"}[$interval]))",
"expr": "sum(rate(blocks_meta_synced{state=\"failed\",namespace=\"$namespace\",job=~\"thanos-bucket-replicate.*\"}[$interval]))",
"format": "time_series",
"intervalFactor": 2,
"legendFormat": "partial meta reads",
Expand Down
7 changes: 3 additions & 4 deletions mixin/dashboards/bucket-replicate.libsonnet
Original file line number Diff line number Diff line change
Expand Up @@ -39,14 +39,13 @@ local g = import '../lib/thanos-grafana-builder/builder.libsonnet';
g.panel('Metrics') +
g.queryPanel(
[
'sum(rate(thanos_replicate_origin_iterations_total{namespace="$namespace",%(selector)s}[$interval]))' % thanos.bucket_replicate,
'sum(rate(thanos_replicate_origin_meta_loads_total{namespace="$namespace",%(selector)s}[$interval]))' % thanos.bucket_replicate,
'sum(rate(thanos_replicate_origin_partial_meta_reads_total{namespace="$namespace",%(selector)s}[$interval]))' % thanos.bucket_replicate,
'sum(rate(blocks_meta_synced{state="loaded",namespace="$namespace",%(selector)s}[$interval]))' % thanos.bucket_replicate,
'sum(rate(blocks_meta_synced{state="failed",namespace="$namespace",%(selector)s}[$interval]))' % thanos.bucket_replicate,
'sum(rate(thanos_replicate_blocks_already_replicated_total{namespace="$namespace",%(selector)s}[$interval]))' % thanos.bucket_replicate,
'sum(rate(thanos_replicate_blocks_replicated_total{namespace="$namespace",%(selector)s}[$interval]))' % thanos.bucket_replicate,
'sum(rate(thanos_replicate_objects_replicated_total{namespace="$namespace",%(selector)s}[$interval]))' % thanos.bucket_replicate,
],
['iterations', 'meta loads', 'partial meta reads', 'already replicated blocks', 'replicated blocks', 'replicated objects']
['meta loads', 'partial meta reads', 'already replicated blocks', 'replicated blocks', 'replicated objects']
)
)
)
Expand Down
82 changes: 9 additions & 73 deletions pkg/replicate/scheme.go
Original file line number Diff line number Diff line change
Expand Up @@ -117,29 +117,13 @@ type replicationScheme struct {
}

type replicationMetrics struct {
originIterations prometheus.Counter
originMetaLoads prometheus.Counter
originPartialMeta prometheus.Counter

blocksAlreadyReplicated prometheus.Counter
blocksReplicated prometheus.Counter
objectsReplicated prometheus.Counter
}

func newReplicationMetrics(reg prometheus.Registerer) *replicationMetrics {
m := &replicationMetrics{
originIterations: promauto.With(reg).NewCounter(prometheus.CounterOpts{
Name: "thanos_replicate_origin_iterations_total",
Help: "Total number of objects iterated over in the origin bucket.",
}),
originMetaLoads: promauto.With(reg).NewCounter(prometheus.CounterOpts{
bwplotka marked this conversation as resolved.
Show resolved Hide resolved
Name: "thanos_replicate_origin_meta_loads_total",
Help: "Total number of meta.json reads in the origin bucket.",
}),
originPartialMeta: promauto.With(reg).NewCounter(prometheus.CounterOpts{
Name: "thanos_replicate_origin_partial_meta_reads_total",
Help: "Total number of partial meta reads encountered.",
}),
blocksAlreadyReplicated: promauto.With(reg).NewCounter(prometheus.CounterOpts{
Name: "thanos_replicate_blocks_already_replicated_total",
Help: "Total number of blocks skipped due to already being replicated.",
Expand Down Expand Up @@ -183,45 +167,20 @@ func newReplicationScheme(
func (rs *replicationScheme) execute(ctx context.Context) error {
availableBlocks := []*metadata.Meta{}

level.Debug(rs.logger).Log("msg", "scanning blocks available blocks for replication")

if err := rs.fromBkt.Iter(ctx, "", func(name string) error {
rs.metrics.originIterations.Inc()

id, ok := thanosblock.IsBlockDir(name)
if !ok {
return nil
}

rs.metrics.originMetaLoads.Inc()

meta, metaNonExistentOrPartial, err := loadMeta(ctx, rs, id)
if metaNonExistentOrPartial {
// meta.json is the last file uploaded by a Thanos shipper,
// therefore a block may be partially present, but no meta.json
// file yet. If this is the case we skip that block for now.
rs.metrics.originPartialMeta.Inc()
level.Info(rs.logger).Log("msg", "block meta not uploaded yet. Skipping.", "block_uuid", id.String())
return nil
}
if err != nil {
return errors.Wrapf(err, "load meta for block %v from origin bucket", id.String())
}
metas, partials, err := rs.fetcher.Fetch(ctx)
if err != nil {
return err
kakkoyun marked this conversation as resolved.
Show resolved Hide resolved
}
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Perhaps we should log the error if metas != nil && err != nil?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It is good to log the error here, but kind of duplicate.

I just check the code, only here returns metas and error at the same time. In this case, partials != nil, so we will log the partial meta errors later.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

hm the if err != nil && metas == nil { feels like code smell to me though. It just looks wrong from outside which can lead to bugs in future:

  • Inside implementation can change, and metas can be produced without partial for some reason - then we will miss an error.
  • Even if we say it makes sense, maybe another person will come here and refactor it anyway as it looks like error can be missed.

On other hand, shouldn't replicate FAIL on any error spotted? Even if we failed to read one block? Sometimes we heavily rely on replication status so this has to be heavily tested and controlled, ideally with failed metric being somewhere (:


if len(meta.Thanos.Labels) == 0 {
// TODO(bwplotka): Allow injecting custom labels as shipper does.
level.Info(rs.logger).Log("msg", "block meta without Thanos external labels set. This is not allowed. Skipping.", "block_uuid", id.String())
return nil
}
for id := range partials {
level.Info(rs.logger).Log("msg", "block meta not uploaded yet. Skipping.", "block_uuid", id.String())
}

for id, meta := range metas {
if rs.blockFilter(meta) {
level.Info(rs.logger).Log("msg", "adding block to be replicated", "block_uuid", id.String())
availableBlocks = append(availableBlocks, meta)
}

return nil
}); err != nil {
return errors.Wrap(err, "iterate over origin bucket")
}

// In order to prevent races in compactions by the target environment, we
Expand Down Expand Up @@ -266,6 +225,7 @@ func (rs *replicationScheme) ensureBlockIsReplicated(ctx context.Context, id uli
return errors.Wrap(err, "get meta file from target bucket")
}

// TODO(bwplotka): Allow injecting custom labels as shipper does.
originMetaFileContent, err := ioutil.ReadAll(originMetaFile)
if err != nil {
return errors.Wrap(err, "read origin meta file")
Expand Down Expand Up @@ -348,27 +308,3 @@ func (rs *replicationScheme) ensureObjectReplicated(ctx context.Context, objectN

return nil
}

// loadMeta loads the meta.json from the origin bucket and returns the meta
// struct as well as if failed, whether the failure was due to the meta.json
// not being present or partial. The distinction is important, as if missing or
// partial, this is just a temporary failure, as the block is still being
// uploaded to the origin bucket.
func loadMeta(ctx context.Context, rs *replicationScheme, id ulid.ULID) (*metadata.Meta, bool, error) {
metas, _, err := rs.fetcher.Fetch(ctx)
if err != nil {
switch errors.Cause(err) {
default:
return nil, false, errors.Wrap(err, "fetch meta")
case thanosblock.ErrorSyncMetaNotFound:
return nil, true, errors.Wrap(err, "fetch meta")
}
}

m, ok := metas[id]
if !ok {
return nil, true, errors.Wrap(err, "fetch meta")
}

return m, false, nil
}