Skip to content

Commit

Permalink
Add client warnings and abort to tombstone and coordinator reads whic…
Browse files Browse the repository at this point in the history
…h go past a low/high watermark

patch by David Capwell; reviewed by Blake Eggleston, Marcus Eriksson for CASSANDRA-16850
  • Loading branch information
dcapwell committed Aug 26, 2021
1 parent ad24942 commit 4ec4ab9
Show file tree
Hide file tree
Showing 44 changed files with 1,758 additions and 60 deletions.
1 change: 1 addition & 0 deletions CHANGES.txt
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
4.1
* Add client warnings and abort to tombstone and coordinator reads which go past a low/high watermark (CASSANDRA-16850)
* Add TTL support to nodetool snapshots (CASSANDRA-16789)
* Allow CommitLogSegmentReader to optionally skip sync marker CRC checks (CASSANDRA-16842)
* allow blocking IPs from updating metrics about traffic (CASSANDRA-16859)
Expand Down
7 changes: 7 additions & 0 deletions NEWS.txt
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,13 @@ using the provided 'sstableupgrade' tool.

New features
------------
- Warn/abort thresholds added to read queries notifying clients when these thresholds trigger (by
emitting a client warning or aborting the query). This feature is disabled by default, scheduled
to be enabled in 4.2; it is controlled with the configuration client_track_warnings_enabled,
setting to true will enable this feature. Each check has its own warn/abort thresholds, currently
tombstones (tombstone_warn_threshold, and tombstone_failure_threshold) and coordinator result set
materialized size (client_large_read_warn_threshold_kb, and client_large_read_abort_threshold_kb)
are supported; more checks will be added over time.

Upgrading
---------
Expand Down
14 changes: 14 additions & 0 deletions conf/cassandra.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -1453,3 +1453,17 @@ enable_drop_compact_storage: false
# subnets:
# - 127.0.0.1
# - 127.0.0.0/31

# Enables tracking warnings/aborts across all replicas for reporting back to client.
# Scheduled to enable in 4.2
# See: CASSANDRA-16850
# See: tombstone_warn_threshold, tombstone_failure_threshold, client_large_read_warn_threshold_kb, and client_large_read_abort_threshold_kb
#client_track_warnings_enabled: false

# When client_track_warnings_enabled: true, this tracks the materialized size of a query on the
# coordinator. If client_large_read_warn_threshold_kb is greater than 0, this will emit a warning
# to clients with details on what query triggered this as well as the size of the result set; if
# client_large_read_abort_threshold_kb is greater than 0, this will abort the query after it
# has exceeded this threshold, returning a read error to the user.
#client_large_read_warn_threshold_kb: 0
#client_large_read_abort_threshold_kb: 0
5 changes: 5 additions & 0 deletions src/java/org/apache/cassandra/config/Config.java
Original file line number Diff line number Diff line change
Expand Up @@ -349,11 +349,16 @@ public class Config
public volatile int tombstone_warn_threshold = 1000;
public volatile int tombstone_failure_threshold = 100000;

public volatile long client_large_read_warn_threshold_kb = 0;
public volatile long client_large_read_abort_threshold_kb = 0;

public final ReplicaFilteringProtectionOptions replica_filtering_protection = new ReplicaFilteringProtectionOptions();

public volatile Long index_summary_capacity_in_mb;
public volatile int index_summary_resize_interval_in_minutes = 60;

public volatile boolean client_track_warnings_enabled = false; // should set to true in 4.2

public int gc_log_threshold_in_ms = 200;
public int gc_warn_threshold_in_ms = 1000;

Expand Down
33 changes: 33 additions & 0 deletions src/java/org/apache/cassandra/config/DatabaseDescriptor.java
Original file line number Diff line number Diff line change
Expand Up @@ -857,6 +857,9 @@ else if (config.concurrent_validations > config.concurrent_compactors && !allowU
throw new ConfigurationException("To set concurrent_validations > concurrent_compactors, " +
"set the system property cassandra.allow_unlimited_concurrent_validations=true");
}

conf.client_large_read_warn_threshold_kb = Math.max(conf.client_large_read_warn_threshold_kb, 0);
conf.client_large_read_abort_threshold_kb = Math.max(conf.client_large_read_abort_threshold_kb, 0);
}

@VisibleForTesting
Expand Down Expand Up @@ -3443,4 +3446,34 @@ public static SubnetGroups getInternodeErrorReportingExclusions()
{
return conf.internode_error_reporting_exclusions;
}

public static long getClientLargeReadWarnThresholdKB()
{
return conf.client_large_read_warn_threshold_kb;
}

public static void setClientLargeReadWarnThresholdKB(long threshold)
{
conf.client_large_read_warn_threshold_kb = Math.max(threshold, 0);
}

public static long getClientLargeReadAbortThresholdKB()
{
return conf.client_large_read_abort_threshold_kb;
}

public static void setClientLargeReadAbortThresholdKB(long threshold)
{
conf.client_large_read_abort_threshold_kb = Math.max(threshold, 0);
}

public static boolean getClientTrackWarningsEnabled()
{
return conf.client_track_warnings_enabled;
}

public static void setClientTrackWarningsEnabled(boolean value)
{
conf.client_track_warnings_enabled = value;
}
}
106 changes: 106 additions & 0 deletions src/java/org/apache/cassandra/cql3/QueryOptions.java
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@

import io.netty.buffer.ByteBuf;

import org.apache.cassandra.config.DatabaseDescriptor;
import org.apache.cassandra.schema.ColumnMetadata;
import org.apache.cassandra.db.ConsistencyLevel;
import org.apache.cassandra.db.marshal.UTF8Type;
Expand Down Expand Up @@ -216,11 +217,103 @@ public int getNowInSeconds(QueryState state)
// Mainly for the sake of BatchQueryOptions
abstract SpecificOptions getSpecificOptions();

abstract TrackWarnings getTrackWarnings();

public boolean isClientTrackWarningsEnabled()
{
return getTrackWarnings().isEnabled();
}

public long getClientLargeReadWarnThresholdKb()
{
return getTrackWarnings().getClientLargeReadWarnThresholdKb();
}

public long getClientLargeReadAbortThresholdKB()
{
return getTrackWarnings().getClientLargeReadAbortThresholdKB();
}

public QueryOptions prepare(List<ColumnSpecification> specs)
{
return this;
}

interface TrackWarnings
{
boolean isEnabled();

long getClientLargeReadWarnThresholdKb();

long getClientLargeReadAbortThresholdKB();

static TrackWarnings create()
{
// if daemon initialization hasn't happened yet (very common in tests) then ignore
if (!DatabaseDescriptor.isDaemonInitialized())
return DisabledTrackWarnings.INSTANCE;
boolean enabled = DatabaseDescriptor.getClientTrackWarningsEnabled();
if (!enabled)
return DisabledTrackWarnings.INSTANCE;
long clientLargeReadWarnThresholdKb = DatabaseDescriptor.getClientLargeReadWarnThresholdKB();
long clientLargeReadAbortThresholdKB = DatabaseDescriptor.getClientLargeReadAbortThresholdKB();
return new DefaultTrackWarnings(clientLargeReadWarnThresholdKb, clientLargeReadAbortThresholdKB);
}
}

private enum DisabledTrackWarnings implements TrackWarnings
{
INSTANCE;

@Override
public boolean isEnabled()
{
return false;
}

@Override
public long getClientLargeReadWarnThresholdKb()
{
return 0;
}

@Override
public long getClientLargeReadAbortThresholdKB()
{
return 0;
}
}

private static class DefaultTrackWarnings implements TrackWarnings
{
private final long clientLargeReadWarnThresholdKb;
private final long clientLargeReadAbortThresholdKB;

public DefaultTrackWarnings(long clientLargeReadWarnThresholdKb, long clientLargeReadAbortThresholdKB)
{
this.clientLargeReadWarnThresholdKb = clientLargeReadWarnThresholdKb;
this.clientLargeReadAbortThresholdKB = clientLargeReadAbortThresholdKB;
}

@Override
public boolean isEnabled()
{
return true;
}

@Override
public long getClientLargeReadWarnThresholdKb()
{
return clientLargeReadWarnThresholdKb;
}

@Override
public long getClientLargeReadAbortThresholdKB()
{
return clientLargeReadAbortThresholdKB;
}
}

static class DefaultQueryOptions extends QueryOptions
{
private final ConsistencyLevel consistency;
Expand All @@ -230,6 +323,7 @@ static class DefaultQueryOptions extends QueryOptions
private final SpecificOptions options;

private final transient ProtocolVersion protocolVersion;
private final transient TrackWarnings trackWarnings = TrackWarnings.create();

DefaultQueryOptions(ConsistencyLevel consistency, List<ByteBuffer> values, boolean skipMetadata, SpecificOptions options, ProtocolVersion protocolVersion)
{
Expand Down Expand Up @@ -264,6 +358,12 @@ SpecificOptions getSpecificOptions()
{
return options;
}

@Override
TrackWarnings getTrackWarnings()
{
return trackWarnings;
}
}

static class QueryOptionsWrapper extends QueryOptions
Expand Down Expand Up @@ -300,6 +400,12 @@ SpecificOptions getSpecificOptions()
return wrapped.getSpecificOptions();
}

@Override
TrackWarnings getTrackWarnings()
{
return wrapped.getTrackWarnings();
}

@Override
public QueryOptions prepare(List<ColumnSpecification> specs)
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,9 @@ public final class ResultSetBuilder
final long[] timestamps;
final int[] ttls;

private long size = 0;
private boolean sizeWarningEmitted = false;

public ResultSetBuilder(ResultMetadata metadata, Selectors selectors)
{
this(metadata, selectors, null);
Expand All @@ -79,6 +82,30 @@ public ResultSetBuilder(ResultMetadata metadata, Selectors selectors, GroupMaker
Arrays.fill(ttls, -1);
}

private void addSize(List<ByteBuffer> row)
{
for (int i=0, isize=row.size(); i<isize; i++)
{
ByteBuffer value = row.get(i);
size += value != null ? value.remaining() : 0;
}
}

public boolean shouldWarn(long thresholdKB)
{
if (thresholdKB > 0 && !sizeWarningEmitted && size > thresholdKB << 10)
{
sizeWarningEmitted = true;
return true;
}
return false;
}

public boolean shouldReject(long thresholdKB)
{
return thresholdKB > 0 && size > thresholdKB << 10;
}

public void add(ByteBuffer v)
{
current.add(v);
Expand Down Expand Up @@ -166,6 +193,8 @@ public ResultSet build()

private List<ByteBuffer> getOutputRow()
{
return selectors.getOutputRow();
List<ByteBuffer> row = selectors.getOutputRow();
addSize(row);
return row;
}
}
Loading

0 comments on commit 4ec4ab9

Please sign in to comment.