Skip to content

Commit

Permalink
Expose streaming as a vtable
Browse files Browse the repository at this point in the history
patch by David Capwell; reviewed by Dinesh Joshi, Paulo Motta for CASSANDRA-17390
  • Loading branch information
dcapwell committed Mar 1, 2022
1 parent 445e49a commit e87a1e0
Show file tree
Hide file tree
Showing 25 changed files with 1,214 additions and 27 deletions.
1 change: 1 addition & 0 deletions CHANGES.txt
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
4.1
* Expose streaming as a vtable (CASSANDRA-17390)
* Make startup checks configurable (CASSANDRA-17220)
* Add guardrail for number of partition keys on IN queries (CASSANDRA-17186)
* update Python test framework from nose to pytest (CASSANDRA-17293)
Expand Down
2 changes: 2 additions & 0 deletions src/java/org/apache/cassandra/config/Config.java
Original file line number Diff line number Diff line change
Expand Up @@ -768,6 +768,8 @@ public static void setClientMode(boolean clientMode)
public volatile Set<String> table_properties_disallowed = Collections.emptySet();
public volatile boolean user_timestamps_enabled = true;
public volatile boolean read_before_write_list_operations_enabled = true;
public volatile DurationSpec streaming_state_expires = DurationSpec.inDays(3);
public volatile DataStorageSpec streaming_state_size = DataStorageSpec.inMebibytes(40);

/** The configuration of startup checks. */
public volatile Map<StartupCheckType, Map<String, Object>> startup_checks = new HashMap<>();
Expand Down
28 changes: 28 additions & 0 deletions src/java/org/apache/cassandra/config/DatabaseDescriptor.java
Original file line number Diff line number Diff line change
Expand Up @@ -3885,4 +3885,32 @@ public static void setForceNewPreparedStatementBehaviour(boolean value)
conf.force_new_prepared_statement_behaviour = value;
}
}

public static DurationSpec getStreamingStateExpires()
{
return conf.streaming_state_expires;
}

public static void setStreamingStateExpires(DurationSpec duration)
{
if (!conf.streaming_state_expires.equals(Objects.requireNonNull(duration, "duration")))
{
logger.info("Setting streaming_state_expires to {}", duration);
conf.streaming_state_expires = duration;
}
}

public static DataStorageSpec getStreamingStateSize()
{
return conf.streaming_state_size;
}

public static void setStreamingStateSize(DataStorageSpec duration)
{
if (!conf.streaming_state_size.equals(Objects.requireNonNull(duration, "duration")))
{
logger.info("Setting streaming_state_size to {}", duration);
conf.streaming_state_size = duration;
}
}
}
24 changes: 24 additions & 0 deletions src/java/org/apache/cassandra/config/DurationSpec.java
Original file line number Diff line number Diff line change
Expand Up @@ -129,6 +129,19 @@ public DurationSpec(String value, TimeUnit minUnit)
}
}

// get vs no-get prefix is not consistent in the code base, but for classes involved with config parsing, it is
// imporant to be explicit about get/set as this changes how parsing is done; this class is a data-type, so is
// not nested, having get/set can confuse parsing thinking this is a nested type
public long quantity()
{
return quantity;
}

public TimeUnit unit()
{
return unit;
}

/**
* Creates a {@code DurationSpec} of the specified amount of milliseconds.
*
Expand Down Expand Up @@ -178,6 +191,17 @@ public static DurationSpec inHours(long hours)
return new DurationSpec(hours, HOURS);
}

/**
* Creates a {@code DurationSpec} of the specified amount of days.
*
* @param days the amount of days
* @return a duration
*/
public static DurationSpec inDays(long days)
{
return new DurationSpec(days, DAYS);
}

/**
* Creates a {@code DurationSpec} of the specified amount of seconds. Custom method for special cases.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -135,6 +135,12 @@ public void truncate()
throw new InvalidRequestException("Truncation is not supported by table " + metadata);
}

@Override
public String toString()
{
return metadata().toString();
}

public interface DataSet
{
boolean isEmpty();
Expand Down
6 changes: 4 additions & 2 deletions src/java/org/apache/cassandra/db/virtual/SimpleDataSet.java
Original file line number Diff line number Diff line change
Expand Up @@ -176,8 +176,10 @@ private Row(TableMetadata metadata, Clustering<?> clustering)
private void add(String columnName, Object value)
{
ColumnMetadata column = metadata.getColumn(ByteBufferUtil.bytes(columnName));
if (null == column || !column.isRegular())
throw new IllegalArgumentException();
if (column == null)
throw new IllegalArgumentException("Unknown column: " + columnName);
if (!column.isRegular())
throw new IllegalArgumentException(String.format("Expect a regular column %s, but got %s", columnName, column.kind));
values.put(column, value);
}

Expand Down
109 changes: 109 additions & 0 deletions src/java/org/apache/cassandra/db/virtual/StreamingVirtualTable.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,109 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.cassandra.db.virtual;

import java.util.Date;
import java.util.Map;
import java.util.UUID;
import java.util.stream.Collectors;

import org.apache.cassandra.db.DecoratedKey;
import org.apache.cassandra.db.marshal.UUIDType;
import org.apache.cassandra.schema.TableMetadata;
import org.apache.cassandra.streaming.StreamManager;
import org.apache.cassandra.streaming.StreamingState;

import static org.apache.cassandra.cql3.statements.schema.CreateTableStatement.parse;

public class StreamingVirtualTable extends AbstractVirtualTable
{
public StreamingVirtualTable(String keyspace)
{
super(parse("CREATE TABLE streaming (" +
" id uuid,\n" +
" follower boolean,\n" +
" operation text, \n" +
" peers frozen<list<text>>,\n" +
" status text,\n" +
" progress_percentage float,\n" +
" last_updated_at timestamp,\n" +
" duration_millis bigint,\n" +
" failure_cause text,\n" +
" success_message text,\n" +
"\n" +
StreamingState.Sessions.columns() +
"\n" +
stateColumns() +
"\n" +
"PRIMARY KEY ((id))" +
")", keyspace)
.kind(TableMetadata.Kind.VIRTUAL)
.build());
}

private static String stateColumns()
{
StringBuilder sb = new StringBuilder();
for (StreamingState.Status state : StreamingState.Status.values())
sb.append(" status_").append(state.name().toLowerCase()).append("_timestamp timestamp,\n");
return sb.toString();
}

@Override
public DataSet data()
{
SimpleDataSet result = new SimpleDataSet(metadata());
StreamManager.instance.getStreamingStates()
.forEach(s -> updateDataSet(result, s));
return result;
}

@Override
public DataSet data(DecoratedKey partitionKey)
{
UUID id = UUIDType.instance.compose(partitionKey.getKey());
SimpleDataSet result = new SimpleDataSet(metadata());
StreamingState state = StreamManager.instance.getStreamingState(id);
if (state != null)
updateDataSet(result, state);
return result;
}

private void updateDataSet(SimpleDataSet ds, StreamingState state)
{
ds.row(state.id());
ds.column("last_updated_at", new Date(state.lastUpdatedAtMillis())); // read early to see latest state
ds.column("follower", state.follower());
ds.column("operation", state.operation().getDescription());
ds.column("peers", state.peers().stream().map(Object::toString).collect(Collectors.toList()));
ds.column("status", state.status().name().toLowerCase());
ds.column("progress_percentage", round(state.progress() * 100));
ds.column("duration_millis", state.durationMillis());
ds.column("failure_cause", state.failureCause());
ds.column("success_message", state.successMessage());
for (Map.Entry<StreamingState.Status, Long> e : state.stateTimesMillis().entrySet())
ds.column("status_" + e.getKey().name().toLowerCase() + "_timestamp", new Date(e.getValue()));

state.sessions().update(ds);
}

static float round(float value)
{
return Math.round(value * 100) / 100;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@ private SystemViewsKeyspace()
.add(new RolesCacheKeysTable(VIRTUAL_VIEWS))
.add(new CQLMetricsTable(VIRTUAL_VIEWS))
.add(new BatchMetricsTable(VIRTUAL_VIEWS))
.add(new StreamingVirtualTable(VIRTUAL_VIEWS))
.build());
}
}
2 changes: 2 additions & 0 deletions src/java/org/apache/cassandra/service/CassandraDaemon.java
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,7 @@
import org.apache.cassandra.schema.SchemaConstants;
import org.apache.cassandra.schema.TableMetadata;
import org.apache.cassandra.security.ThreadAwareSecurityManager;
import org.apache.cassandra.streaming.StreamManager;
import org.apache.cassandra.utils.FBUtilities;
import org.apache.cassandra.utils.JMXServerUtils;
import org.apache.cassandra.utils.JVMStabilityInspector;
Expand Down Expand Up @@ -370,6 +371,7 @@ protected void setup()
ScheduledExecutors.optionalTasks.scheduleWithFixedDelay(SizeEstimatesRecorder.instance, 30, sizeRecorderInterval, TimeUnit.SECONDS);

ActiveRepairService.instance.start();
StreamManager.instance.start();

// Prepared statements
QueryProcessor.instance.preloadPreparedStatements();
Expand Down
11 changes: 7 additions & 4 deletions src/java/org/apache/cassandra/streaming/SessionInfo.java
Original file line number Diff line number Diff line change
Expand Up @@ -44,8 +44,8 @@ public final class SessionInfo implements Serializable
/** Current session state */
public final StreamSession.State state;

private final Map<String, ProgressInfo> receivingFiles;
private final Map<String, ProgressInfo> sendingFiles;
private final Map<String, ProgressInfo> receivingFiles = new ConcurrentHashMap<>();
private final Map<String, ProgressInfo> sendingFiles = new ConcurrentHashMap<>();

public SessionInfo(InetSocketAddress peer,
int sessionIndex,
Expand All @@ -59,11 +59,14 @@ public SessionInfo(InetSocketAddress peer,
this.connecting = connecting;
this.receivingSummaries = ImmutableSet.copyOf(receivingSummaries);
this.sendingSummaries = ImmutableSet.copyOf(sendingSummaries);
this.receivingFiles = new ConcurrentHashMap<>();
this.sendingFiles = new ConcurrentHashMap<>();
this.state = state;
}

public SessionInfo(SessionInfo other)
{
this(other.peer, other.sessionIndex, other.connecting, other.receivingSummaries, other.sendingSummaries, other.state);
}

public boolean isFailed()
{
return state == StreamSession.State.FAILED;
Expand Down
Loading

0 comments on commit e87a1e0

Please sign in to comment.