From e87a1e0c0a19c64ed2edc2d340c0f8af16776e2c Mon Sep 17 00:00:00 2001 From: David Capwell Date: Tue, 1 Mar 2022 13:15:18 -0800 Subject: [PATCH] Expose streaming as a vtable patch by David Capwell; reviewed by Dinesh Joshi, Paulo Motta for CASSANDRA-17390 --- CHANGES.txt | 1 + .../org/apache/cassandra/config/Config.java | 2 + .../cassandra/config/DatabaseDescriptor.java | 28 ++ .../apache/cassandra/config/DurationSpec.java | 24 + .../db/virtual/AbstractVirtualTable.java | 6 + .../cassandra/db/virtual/SimpleDataSet.java | 6 +- .../db/virtual/StreamingVirtualTable.java | 109 +++++ .../db/virtual/SystemViewsKeyspace.java | 1 + .../cassandra/service/CassandraDaemon.java | 2 + .../cassandra/streaming/SessionInfo.java | 11 +- .../cassandra/streaming/StreamManager.java | 145 +++++- .../streaming/StreamResultFuture.java | 13 +- .../cassandra/streaming/StreamingState.java | 432 ++++++++++++++++++ .../management/StreamEventJMXNotifier.java | 2 +- .../nodetool/formatter/TableBuilder.java | 20 + .../cassandra/distributed/impl/Instance.java | 3 + .../test/streaming/RebuildStreamingTest.java | 96 ++++ .../distributed/util/QueryResultUtil.java | 70 ++- .../org/apache/cassandra/cql3/CQLTester.java | 17 +- .../org/apache/cassandra/db/MmapFileTest.java | 10 + .../cassandra/db/filter/ColumnFilterTest.java | 2 + .../db/virtual/StreamingVirtualTableTest.java | 219 +++++++++ .../apache/cassandra/io/util/FileTest.java | 4 + .../AbstractFilesystemOwnershipCheckTest.java | 9 + .../cassandra/utils/binlog/BinLogTest.java | 9 + 25 files changed, 1214 insertions(+), 27 deletions(-) create mode 100644 src/java/org/apache/cassandra/db/virtual/StreamingVirtualTable.java create mode 100644 src/java/org/apache/cassandra/streaming/StreamingState.java create mode 100644 test/distributed/org/apache/cassandra/distributed/test/streaming/RebuildStreamingTest.java create mode 100644 test/unit/org/apache/cassandra/db/virtual/StreamingVirtualTableTest.java diff --git a/CHANGES.txt b/CHANGES.txt index 5580102ad5f7..97ce6c85acec 100644 --- a/CHANGES.txt +++ b/CHANGES.txt @@ -1,4 +1,5 @@ 4.1 + * Expose streaming as a vtable (CASSANDRA-17390) * Make startup checks configurable (CASSANDRA-17220) * Add guardrail for number of partition keys on IN queries (CASSANDRA-17186) * update Python test framework from nose to pytest (CASSANDRA-17293) diff --git a/src/java/org/apache/cassandra/config/Config.java b/src/java/org/apache/cassandra/config/Config.java index 15226cfd03d1..964a5e18906b 100644 --- a/src/java/org/apache/cassandra/config/Config.java +++ b/src/java/org/apache/cassandra/config/Config.java @@ -768,6 +768,8 @@ public static void setClientMode(boolean clientMode) public volatile Set table_properties_disallowed = Collections.emptySet(); public volatile boolean user_timestamps_enabled = true; public volatile boolean read_before_write_list_operations_enabled = true; + public volatile DurationSpec streaming_state_expires = DurationSpec.inDays(3); + public volatile DataStorageSpec streaming_state_size = DataStorageSpec.inMebibytes(40); /** The configuration of startup checks. */ public volatile Map> startup_checks = new HashMap<>(); diff --git a/src/java/org/apache/cassandra/config/DatabaseDescriptor.java b/src/java/org/apache/cassandra/config/DatabaseDescriptor.java index 9f88ce02a67d..65cb3d62be70 100644 --- a/src/java/org/apache/cassandra/config/DatabaseDescriptor.java +++ b/src/java/org/apache/cassandra/config/DatabaseDescriptor.java @@ -3885,4 +3885,32 @@ public static void setForceNewPreparedStatementBehaviour(boolean value) conf.force_new_prepared_statement_behaviour = value; } } + + public static DurationSpec getStreamingStateExpires() + { + return conf.streaming_state_expires; + } + + public static void setStreamingStateExpires(DurationSpec duration) + { + if (!conf.streaming_state_expires.equals(Objects.requireNonNull(duration, "duration"))) + { + logger.info("Setting streaming_state_expires to {}", duration); + conf.streaming_state_expires = duration; + } + } + + public static DataStorageSpec getStreamingStateSize() + { + return conf.streaming_state_size; + } + + public static void setStreamingStateSize(DataStorageSpec duration) + { + if (!conf.streaming_state_size.equals(Objects.requireNonNull(duration, "duration"))) + { + logger.info("Setting streaming_state_size to {}", duration); + conf.streaming_state_size = duration; + } + } } diff --git a/src/java/org/apache/cassandra/config/DurationSpec.java b/src/java/org/apache/cassandra/config/DurationSpec.java index f06c2477d1b8..e5edb3558a02 100644 --- a/src/java/org/apache/cassandra/config/DurationSpec.java +++ b/src/java/org/apache/cassandra/config/DurationSpec.java @@ -129,6 +129,19 @@ public DurationSpec(String value, TimeUnit minUnit) } } + // get vs no-get prefix is not consistent in the code base, but for classes involved with config parsing, it is + // imporant to be explicit about get/set as this changes how parsing is done; this class is a data-type, so is + // not nested, having get/set can confuse parsing thinking this is a nested type + public long quantity() + { + return quantity; + } + + public TimeUnit unit() + { + return unit; + } + /** * Creates a {@code DurationSpec} of the specified amount of milliseconds. * @@ -178,6 +191,17 @@ public static DurationSpec inHours(long hours) return new DurationSpec(hours, HOURS); } + /** + * Creates a {@code DurationSpec} of the specified amount of days. + * + * @param days the amount of days + * @return a duration + */ + public static DurationSpec inDays(long days) + { + return new DurationSpec(days, DAYS); + } + /** * Creates a {@code DurationSpec} of the specified amount of seconds. Custom method for special cases. * diff --git a/src/java/org/apache/cassandra/db/virtual/AbstractVirtualTable.java b/src/java/org/apache/cassandra/db/virtual/AbstractVirtualTable.java index b20ac434ced4..cf90c423bccf 100644 --- a/src/java/org/apache/cassandra/db/virtual/AbstractVirtualTable.java +++ b/src/java/org/apache/cassandra/db/virtual/AbstractVirtualTable.java @@ -135,6 +135,12 @@ public void truncate() throw new InvalidRequestException("Truncation is not supported by table " + metadata); } + @Override + public String toString() + { + return metadata().toString(); + } + public interface DataSet { boolean isEmpty(); diff --git a/src/java/org/apache/cassandra/db/virtual/SimpleDataSet.java b/src/java/org/apache/cassandra/db/virtual/SimpleDataSet.java index b8cb9f5c4d6e..4150783e3de2 100644 --- a/src/java/org/apache/cassandra/db/virtual/SimpleDataSet.java +++ b/src/java/org/apache/cassandra/db/virtual/SimpleDataSet.java @@ -176,8 +176,10 @@ private Row(TableMetadata metadata, Clustering clustering) private void add(String columnName, Object value) { ColumnMetadata column = metadata.getColumn(ByteBufferUtil.bytes(columnName)); - if (null == column || !column.isRegular()) - throw new IllegalArgumentException(); + if (column == null) + throw new IllegalArgumentException("Unknown column: " + columnName); + if (!column.isRegular()) + throw new IllegalArgumentException(String.format("Expect a regular column %s, but got %s", columnName, column.kind)); values.put(column, value); } diff --git a/src/java/org/apache/cassandra/db/virtual/StreamingVirtualTable.java b/src/java/org/apache/cassandra/db/virtual/StreamingVirtualTable.java new file mode 100644 index 000000000000..1036f18426e1 --- /dev/null +++ b/src/java/org/apache/cassandra/db/virtual/StreamingVirtualTable.java @@ -0,0 +1,109 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.cassandra.db.virtual; + +import java.util.Date; +import java.util.Map; +import java.util.UUID; +import java.util.stream.Collectors; + +import org.apache.cassandra.db.DecoratedKey; +import org.apache.cassandra.db.marshal.UUIDType; +import org.apache.cassandra.schema.TableMetadata; +import org.apache.cassandra.streaming.StreamManager; +import org.apache.cassandra.streaming.StreamingState; + +import static org.apache.cassandra.cql3.statements.schema.CreateTableStatement.parse; + +public class StreamingVirtualTable extends AbstractVirtualTable +{ + public StreamingVirtualTable(String keyspace) + { + super(parse("CREATE TABLE streaming (" + + " id uuid,\n" + + " follower boolean,\n" + + " operation text, \n" + + " peers frozen>,\n" + + " status text,\n" + + " progress_percentage float,\n" + + " last_updated_at timestamp,\n" + + " duration_millis bigint,\n" + + " failure_cause text,\n" + + " success_message text,\n" + + "\n" + + StreamingState.Sessions.columns() + + "\n" + + stateColumns() + + "\n" + + "PRIMARY KEY ((id))" + + ")", keyspace) + .kind(TableMetadata.Kind.VIRTUAL) + .build()); + } + + private static String stateColumns() + { + StringBuilder sb = new StringBuilder(); + for (StreamingState.Status state : StreamingState.Status.values()) + sb.append(" status_").append(state.name().toLowerCase()).append("_timestamp timestamp,\n"); + return sb.toString(); + } + + @Override + public DataSet data() + { + SimpleDataSet result = new SimpleDataSet(metadata()); + StreamManager.instance.getStreamingStates() + .forEach(s -> updateDataSet(result, s)); + return result; + } + + @Override + public DataSet data(DecoratedKey partitionKey) + { + UUID id = UUIDType.instance.compose(partitionKey.getKey()); + SimpleDataSet result = new SimpleDataSet(metadata()); + StreamingState state = StreamManager.instance.getStreamingState(id); + if (state != null) + updateDataSet(result, state); + return result; + } + + private void updateDataSet(SimpleDataSet ds, StreamingState state) + { + ds.row(state.id()); + ds.column("last_updated_at", new Date(state.lastUpdatedAtMillis())); // read early to see latest state + ds.column("follower", state.follower()); + ds.column("operation", state.operation().getDescription()); + ds.column("peers", state.peers().stream().map(Object::toString).collect(Collectors.toList())); + ds.column("status", state.status().name().toLowerCase()); + ds.column("progress_percentage", round(state.progress() * 100)); + ds.column("duration_millis", state.durationMillis()); + ds.column("failure_cause", state.failureCause()); + ds.column("success_message", state.successMessage()); + for (Map.Entry e : state.stateTimesMillis().entrySet()) + ds.column("status_" + e.getKey().name().toLowerCase() + "_timestamp", new Date(e.getValue())); + + state.sessions().update(ds); + } + + static float round(float value) + { + return Math.round(value * 100) / 100; + } +} diff --git a/src/java/org/apache/cassandra/db/virtual/SystemViewsKeyspace.java b/src/java/org/apache/cassandra/db/virtual/SystemViewsKeyspace.java index 6fe189e943e3..fc0f40a3ad6d 100644 --- a/src/java/org/apache/cassandra/db/virtual/SystemViewsKeyspace.java +++ b/src/java/org/apache/cassandra/db/virtual/SystemViewsKeyspace.java @@ -45,6 +45,7 @@ private SystemViewsKeyspace() .add(new RolesCacheKeysTable(VIRTUAL_VIEWS)) .add(new CQLMetricsTable(VIRTUAL_VIEWS)) .add(new BatchMetricsTable(VIRTUAL_VIEWS)) + .add(new StreamingVirtualTable(VIRTUAL_VIEWS)) .build()); } } diff --git a/src/java/org/apache/cassandra/service/CassandraDaemon.java b/src/java/org/apache/cassandra/service/CassandraDaemon.java index 82eadbc96b38..67195d7565d9 100644 --- a/src/java/org/apache/cassandra/service/CassandraDaemon.java +++ b/src/java/org/apache/cassandra/service/CassandraDaemon.java @@ -77,6 +77,7 @@ import org.apache.cassandra.schema.SchemaConstants; import org.apache.cassandra.schema.TableMetadata; import org.apache.cassandra.security.ThreadAwareSecurityManager; +import org.apache.cassandra.streaming.StreamManager; import org.apache.cassandra.utils.FBUtilities; import org.apache.cassandra.utils.JMXServerUtils; import org.apache.cassandra.utils.JVMStabilityInspector; @@ -370,6 +371,7 @@ protected void setup() ScheduledExecutors.optionalTasks.scheduleWithFixedDelay(SizeEstimatesRecorder.instance, 30, sizeRecorderInterval, TimeUnit.SECONDS); ActiveRepairService.instance.start(); + StreamManager.instance.start(); // Prepared statements QueryProcessor.instance.preloadPreparedStatements(); diff --git a/src/java/org/apache/cassandra/streaming/SessionInfo.java b/src/java/org/apache/cassandra/streaming/SessionInfo.java index 31cff9b1e88a..55f1e4467f06 100644 --- a/src/java/org/apache/cassandra/streaming/SessionInfo.java +++ b/src/java/org/apache/cassandra/streaming/SessionInfo.java @@ -44,8 +44,8 @@ public final class SessionInfo implements Serializable /** Current session state */ public final StreamSession.State state; - private final Map receivingFiles; - private final Map sendingFiles; + private final Map receivingFiles = new ConcurrentHashMap<>(); + private final Map sendingFiles = new ConcurrentHashMap<>(); public SessionInfo(InetSocketAddress peer, int sessionIndex, @@ -59,11 +59,14 @@ public SessionInfo(InetSocketAddress peer, this.connecting = connecting; this.receivingSummaries = ImmutableSet.copyOf(receivingSummaries); this.sendingSummaries = ImmutableSet.copyOf(sendingSummaries); - this.receivingFiles = new ConcurrentHashMap<>(); - this.sendingFiles = new ConcurrentHashMap<>(); this.state = state; } + public SessionInfo(SessionInfo other) + { + this(other.peer, other.sessionIndex, other.connecting, other.receivingSummaries, other.sendingSummaries, other.state); + } + public boolean isFailed() { return state == StreamSession.State.FAILED; diff --git a/src/java/org/apache/cassandra/streaming/StreamManager.java b/src/java/org/apache/cassandra/streaming/StreamManager.java index bec112b1fa18..86b42ced1f3f 100644 --- a/src/java/org/apache/cassandra/streaming/StreamManager.java +++ b/src/java/org/apache/cassandra/streaming/StreamManager.java @@ -17,9 +17,11 @@ */ package org.apache.cassandra.streaming; +import java.util.Collection; import java.util.Map; import java.util.Set; import java.util.UUID; +import java.util.concurrent.CopyOnWriteArrayList; import javax.management.ListenerNotFoundException; import javax.management.MBeanNotificationInfo; import javax.management.NotificationFilter; @@ -28,12 +30,18 @@ import com.google.common.annotations.VisibleForTesting; import com.google.common.base.Function; +import com.google.common.cache.Cache; +import com.google.common.cache.CacheBuilder; import com.google.common.collect.Iterables; import com.google.common.collect.Sets; import com.google.common.util.concurrent.RateLimiter; import org.cliffc.high_scale_lib.NonBlockingHashMap; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + import org.apache.cassandra.config.DatabaseDescriptor; +import org.apache.cassandra.config.DurationSpec; import org.apache.cassandra.locator.InetAddressAndPort; import org.apache.cassandra.streaming.management.StreamEventJMXNotifier; import org.apache.cassandra.streaming.management.StreamStateCompositeData; @@ -45,6 +53,8 @@ */ public class StreamManager implements StreamManagerMBean { + private static final Logger logger = LoggerFactory.getLogger(StreamManager.class); + public static final StreamManager instance = new StreamManager(); /** @@ -204,6 +214,7 @@ private static double calculateEffectiveRateInBytes(double throughput) } private final StreamEventJMXNotifier notifier = new StreamEventJMXNotifier(); + private final CopyOnWriteArrayList listeners = new CopyOnWriteArrayList<>(); /* * Currently running streams. Removed after completion/failure. @@ -213,6 +224,83 @@ private static double calculateEffectiveRateInBytes(double throughput) private final Map initiatorStreams = new NonBlockingHashMap<>(); private final Map followerStreams = new NonBlockingHashMap<>(); + private final Cache states; + private final StreamListener listener = new StreamListener() + { + @Override + public void onRegister(StreamResultFuture result) + { + // reason for synchronized rather than states.get is to detect duplicates + // streaming shouldn't be producing duplicates as that would imply a planId collision + synchronized (states) + { + StreamingState previous = states.getIfPresent(result.planId); + if (previous == null) + { + StreamingState state = new StreamingState(result); + states.put(state.id(), state); + state.phase.start(); + result.addEventListener(state); + } + else + { + logger.warn("Duplicate streaming states detected for id {}", result.planId); + } + } + } + }; + + public StreamManager() + { + DurationSpec duration = DatabaseDescriptor.getStreamingStateExpires(); + long sizeBytes = DatabaseDescriptor.getStreamingStateSize().toBytes(); + long numElements = sizeBytes / StreamingState.ELEMENT_SIZE; + logger.info("Storing streaming state for {} or for {} elements", duration, numElements); + states = CacheBuilder.newBuilder() + .expireAfterWrite(duration.quantity(), duration.unit()) + .maximumSize(numElements) + .build(); + } + + public void start() + { + addListener(listener); + } + + public void stop() + { + removeListener(listener); + } + + public Collection getStreamingStates() + { + return states.asMap().values(); + } + + public StreamingState getStreamingState(UUID id) + { + return states.getIfPresent(id); + } + + @VisibleForTesting + public void putStreamingState(StreamingState state) + { + synchronized (states) + { + StreamingState previous = states.getIfPresent(state.id()); + if (previous != null) + throw new AssertionError("StreamPlan id " + state.id() + " already exists"); + states.put(state.id(), state); + } + } + + @VisibleForTesting + public void clearStates() + { + // states.cleanUp() doesn't clear, it looks to only run gc on things that could be removed... this method should remove all state + states.asMap().clear(); + } + public Set getCurrentStreams() { return Sets.newHashSet(Iterables.transform(Iterables.concat(initiatorStreams.values(), followerStreams.values()), new Function() @@ -231,6 +319,7 @@ public void registerInitiator(final StreamResultFuture result) result.addListener(() -> initiatorStreams.remove(result.planId)); initiatorStreams.put(result.planId, result); + notifySafeOnRegister(result); } public StreamResultFuture registerFollower(final StreamResultFuture result) @@ -240,7 +329,51 @@ public StreamResultFuture registerFollower(final StreamResultFuture result) result.addListener(() -> followerStreams.remove(result.planId)); StreamResultFuture previous = followerStreams.putIfAbsent(result.planId, result); - return previous == null ? result : previous; + if (previous == null) + { + notifySafeOnRegister(result); + return result; + } + return previous; + } + + @VisibleForTesting + public void putInitiatorStream(StreamResultFuture future) + { + StreamResultFuture current = initiatorStreams.putIfAbsent(future.planId, future); + assert current == null: "Duplicat initiator stream for " + future.planId; + } + + @VisibleForTesting + public void putFollowerStream(StreamResultFuture future) + { + StreamResultFuture current = followerStreams.putIfAbsent(future.planId, future); + assert current == null: "Duplicate follower stream for " + future.planId; + } + + public void addListener(StreamListener listener) + { + listeners.add(listener); + } + + public void removeListener(StreamListener listener) + { + listeners.remove(listener); + } + + private void notifySafeOnRegister(StreamResultFuture result) + { + for (StreamListener l : listeners) + { + try + { + l.onRegister(result); + } + catch (Throwable t) + { + logger.warn("Failed to notify stream listener of new Initiator/Follower", t); + } + } } public StreamResultFuture getReceivingStream(UUID planId) @@ -248,6 +381,11 @@ public StreamResultFuture getReceivingStream(UUID planId) return followerStreams.get(planId); } + public StreamResultFuture getInitiatorStream(UUID planId) + { + return initiatorStreams.get(planId); + } + public void addNotificationListener(NotificationListener listener, NotificationFilter filter, Object handback) { notifier.addNotificationListener(listener, filter, handback); @@ -282,4 +420,9 @@ private StreamSession findSession(Map streams, InetAdd return streamResultFuture.getSession(peer, sessionIndex); } + + public interface StreamListener + { + default void onRegister(StreamResultFuture result) {} + } } diff --git a/src/java/org/apache/cassandra/streaming/StreamResultFuture.java b/src/java/org/apache/cassandra/streaming/StreamResultFuture.java index 66e99be388e5..8f11587d5559 100644 --- a/src/java/org/apache/cassandra/streaming/StreamResultFuture.java +++ b/src/java/org/apache/cassandra/streaming/StreamResultFuture.java @@ -62,7 +62,7 @@ public final class StreamResultFuture extends AsyncFuture * @param planId Stream plan ID * @param streamOperation Stream streamOperation */ - private StreamResultFuture(UUID planId, StreamOperation streamOperation, StreamCoordinator coordinator) + public StreamResultFuture(UUID planId, StreamOperation streamOperation, StreamCoordinator coordinator) { this.planId = planId; this.streamOperation = streamOperation; @@ -208,7 +208,16 @@ synchronized void fireStreamEvent(StreamEvent event) { // delegate to listener for (StreamEventHandler listener : eventListeners) - listener.handleStreamEvent(event); + { + try + { + listener.handleStreamEvent(event); + } + catch (Throwable t) + { + logger.warn("Unexpected exception in listern while calling handleStreamEvent", t); + } + } } private synchronized void maybeComplete() diff --git a/src/java/org/apache/cassandra/streaming/StreamingState.java b/src/java/org/apache/cassandra/streaming/StreamingState.java new file mode 100644 index 000000000000..b7ef61ed8e7e --- /dev/null +++ b/src/java/org/apache/cassandra/streaming/StreamingState.java @@ -0,0 +1,432 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.cassandra.streaming; + +import java.math.BigDecimal; +import java.math.RoundingMode; +import java.net.InetSocketAddress; +import java.util.Collection; +import java.util.Collections; +import java.util.EnumMap; +import java.util.HashSet; +import java.util.Map; +import java.util.Set; +import java.util.UUID; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentMap; +import java.util.concurrent.TimeUnit; + +import com.google.common.base.Throwables; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import org.apache.cassandra.db.virtual.SimpleDataSet; +import org.apache.cassandra.tools.nodetool.formatter.TableBuilder; +import org.apache.cassandra.utils.Clock; +import org.apache.cassandra.utils.ObjectSizes; +import org.checkerframework.checker.nullness.qual.Nullable; + +public class StreamingState implements StreamEventHandler +{ + private static final Logger logger = LoggerFactory.getLogger(StreamingState.class); + + public static final long ELEMENT_SIZE = ObjectSizes.measureDeep(new StreamingState(UUID.randomUUID(), StreamOperation.OTHER, false)); + + public enum Status + {INIT, START, SUCCESS, FAILURE} + + private final long createdAtMillis = Clock.Global.currentTimeMillis(); + + // while streaming is running, this is a cache of StreamInfo seen with progress state + // the reason for the cache is that StreamSession drops data after tasks (recieve/send) complete, this makes + // it so that current state of a future tracks work pending rather than work done, cache solves this by not deleting + // when tasks complete + // To lower memory costs, clear this after the stream completes + private ConcurrentMap streamProgress = new ConcurrentHashMap<>(); + + private final UUID id; + private final boolean follower; + private final StreamOperation operation; + private Set peers = null; + private Sessions sessions = Sessions.EMPTY; + + private Status status; + private String completeMessage = null; + + private final long[] stateTimesNanos; + private volatile long lastUpdatedAtNanos; + + // API for state changes + public final Phase phase = new Phase(); + + public StreamingState(StreamResultFuture result) + { + this(result.planId, result.streamOperation, result.getCoordinator().isFollower()); + } + + private StreamingState(UUID planId, StreamOperation streamOperation, boolean follower) + { + this.id = planId; + this.operation = streamOperation; + this.follower = follower; + this.stateTimesNanos = new long[Status.values().length]; + updateState(Status.INIT); + } + + public UUID id() + { + return id; + } + + public boolean follower() + { + return follower; + } + + public StreamOperation operation() + { + return operation; + } + + public Set peers() + { + Set peers = this.peers; + if (peers != null) + return peers; + ConcurrentMap streamProgress = this.streamProgress; + if (streamProgress != null) + return streamProgress.keySet(); + return Collections.emptySet(); + } + + public Status status() + { + return status; + } + + public Sessions sessions() + { + return sessions; + } + + public boolean isComplete() + { + switch (status) + { + case SUCCESS: + case FAILURE: + return true; + default: + return false; + } + } + + public StreamResultFuture future() + { + if (follower) + return StreamManager.instance.getReceivingStream(id); + else + return StreamManager.instance.getInitiatorStream(id); + } + + public float progress() + { + switch (status) + { + case INIT: + return 0; + case START: + return Math.min(0.99f, sessions().progress().floatValue()); + case SUCCESS: + case FAILURE: + return 1; + default: + throw new AssertionError("unknown state: " + status); + } + } + + public EnumMap stateTimesMillis() + { + EnumMap map = new EnumMap<>(Status.class); + for (int i = 0; i < stateTimesNanos.length; i++) + { + long nanos = stateTimesNanos[i]; + if (nanos != 0) + map.put(Status.values()[i], nanosToMillis(nanos)); + } + return map; + } + + public long durationMillis() + { + long endNanos = lastUpdatedAtNanos; + if (!isComplete()) + endNanos = Clock.Global.nanoTime(); + return TimeUnit.NANOSECONDS.toMillis(endNanos - stateTimesNanos[0]); + } + + public long lastUpdatedAtMillis() + { + return nanosToMillis(lastUpdatedAtNanos); + } + + public long lastUpdatedAtNanos() + { + return lastUpdatedAtNanos; + } + + public String failureCause() + { + if (status == Status.FAILURE) + return completeMessage; + return null; + } + + public String successMessage() + { + if (status == Status.SUCCESS) + return completeMessage; + return null; + } + + @Override + public String toString() + { + TableBuilder table = new TableBuilder(); + table.add("id", id.toString()); + table.add("status", status().name().toLowerCase()); + table.add("progress", (progress() * 100) + "%"); + table.add("duration_ms", Long.toString(durationMillis())); + table.add("last_updated_ms", Long.toString(lastUpdatedAtMillis())); + table.add("failure_cause", failureCause()); + table.add("success_message", successMessage()); + for (Map.Entry e : stateTimesMillis().entrySet()) + table.add("status_" + e.getKey().name().toLowerCase() + "_ms", e.toString()); + return table.toString(); + } + + @Override + public synchronized void handleStreamEvent(StreamEvent event) + { + ConcurrentMap streamProgress = this.streamProgress; + if (streamProgress == null) + { + logger.warn("Got stream event {} after the stream completed", event.eventType); + return; + } + try + { + switch (event.eventType) + { + case STREAM_PREPARED: + streamPrepared((StreamEvent.SessionPreparedEvent) event); + break; + case STREAM_COMPLETE: + // currently not taking track of state, so ignore + break; + case FILE_PROGRESS: + streamProgress((StreamEvent.ProgressEvent) event); + break; + default: + logger.warn("Unknown stream event type: {}", event.eventType); + } + } + catch (Throwable t) + { + logger.warn("Unexpected exception handling stream event", t); + } + sessions = Sessions.create(streamProgress.values()); + lastUpdatedAtNanos = Clock.Global.nanoTime(); + } + + private void streamPrepared(StreamEvent.SessionPreparedEvent event) + { + SessionInfo session = new SessionInfo(event.session); + streamProgress.putIfAbsent(session.peer, session); + } + + private void streamProgress(StreamEvent.ProgressEvent event) + { + SessionInfo info = streamProgress.get(event.progress.peer); + if (info != null) + { + info.updateProgress(event.progress); + } + else + { + logger.warn("[Stream #{}} ID#{}] Recieved stream progress before prepare; peer={}", id, event.progress.sessionIndex, event.progress.peer); + } + } + + @Override + public synchronized void onSuccess(@Nullable StreamState state) + { + ConcurrentMap streamProgress = this.streamProgress; + if (streamProgress != null) + { + sessions = Sessions.create(streamProgress.values()); + peers = new HashSet<>(streamProgress.keySet()); + this.streamProgress = null; + updateState(Status.SUCCESS); + } + } + + @Override + public synchronized void onFailure(Throwable throwable) + { + ConcurrentMap streamProgress = this.streamProgress; + if (streamProgress != null) + { + sessions = Sessions.create(streamProgress.values()); + peers = new HashSet<>(streamProgress.keySet()); + this.streamProgress = null; + } + completeMessage = Throwables.getStackTraceAsString(throwable); + updateState(Status.FAILURE); + } + + private synchronized void updateState(Status state) + { + this.status = state; + long now = Clock.Global.nanoTime(); + stateTimesNanos[state.ordinal()] = now; + lastUpdatedAtNanos = now; + } + + private long nanosToMillis(long nanos) + { + // nanos - creationTimeNanos = delta since init + return createdAtMillis + TimeUnit.NANOSECONDS.toMillis(nanos - stateTimesNanos[0]); + } + + public class Phase + { + public void start() + { + updateState(Status.START); + } + } + + public static class Sessions + { + public static final Sessions EMPTY = new Sessions(0, 0, 0, 0, 0, 0, 0, 0); + + public final long bytesToReceive, bytesReceived; + public final long bytesToSend, bytesSent; + public final long filesToReceive, filesReceived; + public final long filesToSend, filesSent; + + public Sessions(long bytesToReceive, long bytesReceived, long bytesToSend, long bytesSent, long filesToReceive, long filesReceived, long filesToSend, long filesSent) + { + this.bytesToReceive = bytesToReceive; + this.bytesReceived = bytesReceived; + this.bytesToSend = bytesToSend; + this.bytesSent = bytesSent; + this.filesToReceive = filesToReceive; + this.filesReceived = filesReceived; + this.filesToSend = filesToSend; + this.filesSent = filesSent; + } + + public static String columns() + { + return " bytes_to_receive bigint, \n" + + " bytes_received bigint, \n" + + " bytes_to_send bigint, \n" + + " bytes_sent bigint, \n" + + " files_to_receive bigint, \n" + + " files_received bigint, \n" + + " files_to_send bigint, \n" + + " files_sent bigint, \n"; + } + + public static Sessions create(Collection sessions) + { + long bytesToReceive = 0; + long bytesReceived = 0; + long filesToReceive = 0; + long filesReceived = 0; + long bytesToSend = 0; + long bytesSent = 0; + long filesToSend = 0; + long filesSent = 0; + for (SessionInfo session : sessions) + { + bytesToReceive += session.getTotalSizeToReceive(); + bytesReceived += session.getTotalSizeReceived(); + + filesToReceive += session.getTotalFilesToReceive(); + filesReceived += session.getTotalFilesReceived(); + + bytesToSend += session.getTotalSizeToSend(); + bytesSent += session.getTotalSizeSent(); + + filesToSend += session.getTotalFilesToSend(); + filesSent += session.getTotalFilesSent(); + } + if (0 == bytesToReceive && 0 == bytesReceived && 0 == filesToReceive && 0 == filesReceived && 0 == bytesToSend && 0 == bytesSent && 0 == filesToSend && 0 == filesSent) + return EMPTY; + return new Sessions(bytesToReceive, bytesReceived, + bytesToSend, bytesSent, + filesToReceive, filesReceived, + filesToSend, filesSent); + } + + public boolean isEmpty() + { + return this == EMPTY; + } + + public BigDecimal receivedBytesPercent() + { + return div(bytesReceived, bytesToReceive); + } + + public BigDecimal sentBytesPercent() + { + return div(bytesSent, bytesToSend); + } + + public BigDecimal progress() + { + return div(bytesSent + bytesReceived, bytesToSend + bytesToReceive); + } + + private static BigDecimal div(long a, long b) + { + // not "correct" but its what you would do if this happened... + if (b == 0) + return BigDecimal.ZERO; + return BigDecimal.valueOf(a).divide(BigDecimal.valueOf(b), 4, RoundingMode.HALF_UP); + } + + public void update(SimpleDataSet ds) + { + if (isEmpty()) + return; + ds.column("bytes_to_receive", bytesToReceive) + .column("bytes_received", bytesReceived) + .column("bytes_to_send", bytesToSend) + .column("bytes_sent", bytesSent) + .column("files_to_receive", filesToReceive) + .column("files_received", filesReceived) + .column("files_to_send", filesToSend) + .column("files_sent", filesSent); + } + } +} diff --git a/src/java/org/apache/cassandra/streaming/management/StreamEventJMXNotifier.java b/src/java/org/apache/cassandra/streaming/management/StreamEventJMXNotifier.java index e504dae591cf..b8c74874312d 100644 --- a/src/java/org/apache/cassandra/streaming/management/StreamEventJMXNotifier.java +++ b/src/java/org/apache/cassandra/streaming/management/StreamEventJMXNotifier.java @@ -87,7 +87,7 @@ public void onFailure(Throwable t) Notification notif = new Notification(StreamEvent.class.getCanonicalName() + ".failure", StreamManagerMBean.OBJECT_NAME, seq.getAndIncrement()); - notif.setUserData(t.fillInStackTrace().toString()); + notif.setUserData(t.toString()); sendNotification(notif); } } diff --git a/src/java/org/apache/cassandra/tools/nodetool/formatter/TableBuilder.java b/src/java/org/apache/cassandra/tools/nodetool/formatter/TableBuilder.java index 166ed3d87c75..025716b02dae 100644 --- a/src/java/org/apache/cassandra/tools/nodetool/formatter/TableBuilder.java +++ b/src/java/org/apache/cassandra/tools/nodetool/formatter/TableBuilder.java @@ -18,7 +18,11 @@ package org.apache.cassandra.tools.nodetool.formatter; +import java.io.ByteArrayOutputStream; import java.io.PrintStream; +import java.io.UncheckedIOException; +import java.io.UnsupportedEncodingException; +import java.nio.charset.StandardCharsets; import java.util.ArrayList; import java.util.Collections; import java.util.Comparator; @@ -116,6 +120,22 @@ public void printTo(PrintStream out) } } + @Override + public String toString() + { + ByteArrayOutputStream os = new ByteArrayOutputStream(); + try (PrintStream stream = new PrintStream(os, true, StandardCharsets.UTF_8.displayName())) + { + printTo(stream); + stream.flush(); + return os.toString(StandardCharsets.UTF_8.displayName()); + } + catch (UnsupportedEncodingException e) + { + throw new UncheckedIOException(e); + } + } + /** * Share max offsets across multiple TableBuilders */ diff --git a/test/distributed/org/apache/cassandra/distributed/impl/Instance.java b/test/distributed/org/apache/cassandra/distributed/impl/Instance.java index e5a933dd605b..38cfef0c34f5 100644 --- a/test/distributed/org/apache/cassandra/distributed/impl/Instance.java +++ b/test/distributed/org/apache/cassandra/distributed/impl/Instance.java @@ -120,6 +120,7 @@ import org.apache.cassandra.service.StorageServiceMBean; import org.apache.cassandra.service.reads.trackwarnings.CoordinatorWarnings; import org.apache.cassandra.service.snapshot.SnapshotManager; +import org.apache.cassandra.streaming.StreamManager; import org.apache.cassandra.streaming.StreamReceiveTask; import org.apache.cassandra.streaming.StreamTransferTask; import org.apache.cassandra.streaming.async.NettyStreamingChannel; @@ -667,6 +668,7 @@ else if (cluster instanceof Cluster) throw new IllegalStateException(String.format("%s != %s", FBUtilities.getBroadcastAddressAndPort(), broadcastAddress())); ActiveRepairService.instance.start(); + StreamManager.instance.start(); CassandraDaemon.getInstanceForTesting().completeSetup(); } catch (Throwable t) @@ -740,6 +742,7 @@ public Future shutdown(boolean graceful) NettyStreamingChannel::shutdown, () -> StreamReceiveTask.shutdownAndWait(1L, MINUTES), () -> StreamTransferTask.shutdownAndWait(1L, MINUTES), + () -> StreamManager.instance.stop(), () -> SecondaryIndexManager.shutdownAndWait(1L, MINUTES), () -> IndexSummaryManager.instance.shutdownAndWait(1L, MINUTES), () -> ColumnFamilyStore.shutdownExecutorsAndWait(1L, MINUTES), diff --git a/test/distributed/org/apache/cassandra/distributed/test/streaming/RebuildStreamingTest.java b/test/distributed/org/apache/cassandra/distributed/test/streaming/RebuildStreamingTest.java new file mode 100644 index 000000000000..db896697fbde --- /dev/null +++ b/test/distributed/org/apache/cassandra/distributed/test/streaming/RebuildStreamingTest.java @@ -0,0 +1,96 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.cassandra.distributed.test.streaming; + +import java.io.IOException; +import java.util.Collections; + +import org.junit.Test; + +import org.apache.cassandra.distributed.Cluster; +import org.apache.cassandra.distributed.api.Feature; +import org.apache.cassandra.distributed.api.IInvokableInstance; +import org.apache.cassandra.distributed.api.Row; +import org.apache.cassandra.distributed.api.SimpleQueryResult; +import org.apache.cassandra.distributed.test.TestBaseImpl; +import org.apache.cassandra.distributed.util.QueryResultUtil; + +import static org.assertj.core.api.Assertions.assertThat; + +public class RebuildStreamingTest extends TestBaseImpl +{ + @Test + public void test() throws IOException + { + try (Cluster cluster = init(Cluster.build(2) + .withConfig(c -> c.with(Feature.values()).set("stream_entire_sstables", false)) + .start())) + { + cluster.schemaChange(withKeyspace("CREATE TABLE %s.users (user_id varchar PRIMARY KEY);")); + cluster.stream().forEach(i -> i.nodetoolResult("disableautocompaction", KEYSPACE).asserts().success()); + IInvokableInstance first = cluster.get(1); + IInvokableInstance second = cluster.get(2); + long expectedFiles = 10; + for (int i = 0; i < expectedFiles; i++) + { + first.executeInternal(withKeyspace("insert into %s.users(user_id) values (?)"), "dcapwell" + i); + first.flush(KEYSPACE); + } + + second.nodetoolResult("rebuild", "--keyspace", KEYSPACE).asserts().success(); + + SimpleQueryResult qr = first.executeInternalWithResult("SELECT * FROM system_views.streaming"); + String txt = QueryResultUtil.expand(qr); + qr.reset(); + assertThat(qr.toObjectArrays().length).describedAs("Found rows\n%s", txt).isEqualTo(1); + assertThat(qr.hasNext()).isTrue(); + Row row = qr.next(); + QueryResultUtil.assertThat(row) + .isEqualTo("peers", Collections.singletonList("/127.0.0.2:7012")) + .isEqualTo("follower", true) + .isEqualTo("operation", "Rebuild") + .isEqualTo("status", "success") + .isEqualTo("progress_percentage", 100.0F) + .isEqualTo("success_message", null).isEqualTo("failure_cause", null) + .isEqualTo("files_sent", expectedFiles) + .columnsEqualTo("files_sent", "files_to_send") + .columnsEqualTo("bytes_sent", "bytes_to_send") + .isEqualTo("files_received", 0L) + .columnsEqualTo("files_received", "files_to_receive", "bytes_received", "bytes_to_receive"); + long totalBytes = row.getLong("bytes_sent"); + assertThat(totalBytes).isGreaterThan(0); + + qr = second.executeInternalWithResult("SELECT * FROM system_views.streaming"); + txt = QueryResultUtil.expand(qr); + qr.reset(); + assertThat(qr.toObjectArrays().length).describedAs("Found rows\n%s", txt).isEqualTo(1); + assertThat(qr.hasNext()).isTrue(); + + QueryResultUtil.assertThat(qr.next()) + .isEqualTo("peers", Collections.singletonList("/127.0.0.1:7012")) + .isEqualTo("follower", false) + .isEqualTo("operation", "Rebuild") + .isEqualTo("status", "success") + .isEqualTo("progress_percentage", 100.0F) + .isEqualTo("success_message", null).isEqualTo("failure_cause", null) + .columnsEqualTo("files_to_receive", "files_received").isEqualTo("files_received", expectedFiles) + .columnsEqualTo("bytes_to_receive", "bytes_received").isEqualTo("bytes_received", totalBytes) + .columnsEqualTo("files_sent", "files_to_send", "bytes_sent", "bytes_to_send").isEqualTo("files_sent", 0L); + } + } +} diff --git a/test/distributed/org/apache/cassandra/distributed/util/QueryResultUtil.java b/test/distributed/org/apache/cassandra/distributed/util/QueryResultUtil.java index f9c1d90bf13e..58842bc6913b 100644 --- a/test/distributed/org/apache/cassandra/distributed/util/QueryResultUtil.java +++ b/test/distributed/org/apache/cassandra/distributed/util/QueryResultUtil.java @@ -23,6 +23,7 @@ import org.apache.cassandra.distributed.api.Row; import org.apache.cassandra.distributed.api.SimpleQueryResult; +import org.apache.cassandra.tools.nodetool.formatter.TableBuilder; import org.assertj.core.api.Assertions; public class QueryResultUtil @@ -69,21 +70,70 @@ private static boolean equals(Row a, Object[] bs) return true; } - public static AssertHelper assertThat(SimpleQueryResult qr) + public static SimpleQueryResultAssertHelper assertThat(SimpleQueryResult qr) { - return new AssertHelper(qr); + return new SimpleQueryResultAssertHelper(qr); } - public static class AssertHelper + public static RowAssertHelper assertThat(Row row) + { + return new RowAssertHelper(row); + } + + public static String expand(SimpleQueryResult qr) + { + StringBuilder sb = new StringBuilder(); + int rowNum = 1; + while (qr.hasNext()) + { + sb.append("@ Row ").append(rowNum).append('\n'); + TableBuilder table = new TableBuilder('|'); + Row next = qr.next(); + for (String column : qr.names()) + { + Object value = next.get(column); + table.add(column, value == null ? null : value.toString()); + } + sb.append(table); + } + return sb.toString(); + } + + public static class RowAssertHelper + { + private final Row row; + + public RowAssertHelper(Row row) + { + this.row = row; + } + + public RowAssertHelper isEqualTo(String column, Object expected) + { + Object actual = row.get(column); + Assertions.assertThat(actual).describedAs("Column %s had unexpected value", column).isEqualTo(expected); + return this; + } + + public RowAssertHelper columnsEqualTo(String first, String... others) + { + Object expected = row.get(first); + for (String other : others) + Assertions.assertThat(row.get(other)).describedAs("Columns %s and %s are not equal", first, other).isEqualTo(expected); + return this; + } + } + + public static class SimpleQueryResultAssertHelper { private final SimpleQueryResult qr; - private AssertHelper(SimpleQueryResult qr) + private SimpleQueryResultAssertHelper(SimpleQueryResult qr) { this.qr = qr; } - public AssertHelper contains(Object... values) + public SimpleQueryResultAssertHelper contains(Object... values) { qr.reset(); if (!QueryResultUtil.contains(qr, a -> QueryResultUtil.equals(a, values))) @@ -91,7 +141,7 @@ public AssertHelper contains(Object... values) return this; } - public AssertHelper contains(Row row) + public SimpleQueryResultAssertHelper contains(Row row) { qr.reset(); if (!QueryResultUtil.contains(qr, a -> QueryResultUtil.equals(a, row))) @@ -99,7 +149,7 @@ public AssertHelper contains(Row row) return this; } - public AssertHelper contains(Predicate fn) + public SimpleQueryResultAssertHelper contains(Predicate fn) { qr.reset(); if (!QueryResultUtil.contains(qr, fn)) @@ -107,7 +157,7 @@ public AssertHelper contains(Predicate fn) return this; } - public AssertHelper isEqualTo(Object... values) + public SimpleQueryResultAssertHelper isEqualTo(Object... values) { Assertions.assertThat(qr.toObjectArrays()) .hasSize(1) @@ -115,13 +165,13 @@ public AssertHelper isEqualTo(Object... values) return this; } - public AssertHelper hasSize(int size) + public SimpleQueryResultAssertHelper hasSize(int size) { Assertions.assertThat(qr.toObjectArrays()).hasSize(size); return this; } - public AssertHelper hasSizeGreaterThan(int size) + public SimpleQueryResultAssertHelper hasSizeGreaterThan(int size) { Assertions.assertThat(qr.toObjectArrays()).hasSizeGreaterThan(size); return this; diff --git a/test/unit/org/apache/cassandra/cql3/CQLTester.java b/test/unit/org/apache/cassandra/cql3/CQLTester.java index 6844cb8e15e7..66e9acf4cc47 100644 --- a/test/unit/org/apache/cassandra/cql3/CQLTester.java +++ b/test/unit/org/apache/cassandra/cql3/CQLTester.java @@ -1442,6 +1442,7 @@ public static void assertRows(UntypedResultSet result, Object[]... rows) Assert.assertEquals(String.format("Invalid number of (expected) values provided for row %d", i), expected == null ? 1 : expected.length, meta.size()); + StringBuilder error = new StringBuilder(); for (int j = 0; j < meta.size(); j++) { ColumnSpecification column = meta.get(j); @@ -1454,15 +1455,17 @@ public static void assertRows(UntypedResultSet result, Object[]... rows) { Object actualValueDecoded = actualValue == null ? null : column.type.getSerializer().deserialize(actualValue); if (!Objects.equal(expected != null ? expected[j] : null, actualValueDecoded)) - Assert.fail(String.format("Invalid value for row %d column %d (%s of type %s), expected <%s> but got <%s>", - i, - j, - column.name, - column.type.asCQL3Type(), - formatValue(expectedByteValue != null ? expectedByteValue.duplicate() : null, column.type), - formatValue(actualValue, column.type))); + error.append(String.format("Invalid value for row %d column %d (%s of type %s), expected <%s> but got <%s>", + i, + j, + column.name, + column.type.asCQL3Type(), + formatValue(expectedByteValue != null ? expectedByteValue.duplicate() : null, column.type), + formatValue(actualValue, column.type))).append("\n"); } } + if (error.length() > 0) + Assert.fail(error.toString()); i++; } diff --git a/test/unit/org/apache/cassandra/db/MmapFileTest.java b/test/unit/org/apache/cassandra/db/MmapFileTest.java index a6664264cfa9..4619df3b7d58 100644 --- a/test/unit/org/apache/cassandra/db/MmapFileTest.java +++ b/test/unit/org/apache/cassandra/db/MmapFileTest.java @@ -25,14 +25,24 @@ import javax.management.ObjectName; import org.apache.cassandra.Util; +import org.apache.cassandra.config.DatabaseDescriptor; import org.apache.cassandra.io.util.File; import org.junit.Assert; +import org.junit.BeforeClass; import org.junit.Test; import org.apache.cassandra.io.util.FileUtils; public class MmapFileTest { + @BeforeClass + public static void setup() + { + // PathUtils touches StorageService which touches StreamManager which requires configs be setup + DatabaseDescriptor.daemonInitialization(); + } + + /** * Verifies that {@link sun.misc.Cleaner} works and that mmap'd files can be deleted. */ diff --git a/test/unit/org/apache/cassandra/db/filter/ColumnFilterTest.java b/test/unit/org/apache/cassandra/db/filter/ColumnFilterTest.java index 3e28040d2964..d28d0dc552ec 100644 --- a/test/unit/org/apache/cassandra/db/filter/ColumnFilterTest.java +++ b/test/unit/org/apache/cassandra/db/filter/ColumnFilterTest.java @@ -92,6 +92,8 @@ public static Collection data() @BeforeClass public static void beforeClass() { + // Gossiper touches StorageService which touches StreamManager which requires configs be setup + DatabaseDescriptor.daemonInitialization(); DatabaseDescriptor.setSeedProvider(Arrays::asList); DatabaseDescriptor.setEndpointSnitch(new SimpleSnitch()); DatabaseDescriptor.setDefaultFailureDetector(); diff --git a/test/unit/org/apache/cassandra/db/virtual/StreamingVirtualTableTest.java b/test/unit/org/apache/cassandra/db/virtual/StreamingVirtualTableTest.java new file mode 100644 index 000000000000..270305a8fc1c --- /dev/null +++ b/test/unit/org/apache/cassandra/db/virtual/StreamingVirtualTableTest.java @@ -0,0 +1,219 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.cassandra.db.virtual; + +import java.net.UnknownHostException; +import java.util.Arrays; +import java.util.Collections; +import java.util.Date; +import java.util.UUID; +import java.util.concurrent.ThreadLocalRandom; + +import com.google.common.collect.ImmutableList; +import com.google.common.collect.ImmutableSet; +import org.junit.Before; +import org.junit.BeforeClass; +import org.junit.Test; + +import org.apache.cassandra.cql3.CQLTester; +import org.apache.cassandra.locator.InetAddressAndPort; +import org.apache.cassandra.schema.TableId; +import org.apache.cassandra.streaming.ProgressInfo; +import org.apache.cassandra.streaming.SessionInfo; +import org.apache.cassandra.streaming.StreamCoordinator; +import org.apache.cassandra.streaming.StreamEvent; +import org.apache.cassandra.streaming.StreamManager; +import org.apache.cassandra.streaming.StreamOperation; +import org.apache.cassandra.streaming.StreamResultFuture; +import org.apache.cassandra.streaming.StreamSession; +import org.apache.cassandra.streaming.StreamState; +import org.apache.cassandra.streaming.StreamSummary; +import org.apache.cassandra.streaming.StreamingChannel; +import org.apache.cassandra.streaming.StreamingState; +import org.apache.cassandra.utils.FBUtilities; +import org.assertj.core.util.Throwables; + +public class StreamingVirtualTableTest extends CQLTester +{ + private static final String KS_NAME = "vts"; + private static final InetAddressAndPort PEER1 = address(127, 0, 0, 1); + private static final InetAddressAndPort PEER2 = address(127, 0, 0, 2); + private static final InetAddressAndPort PEER3 = address(127, 0, 0, 3); + private static String TABLE_NAME; + + @BeforeClass + public static void setup() + { + CQLTester.setUpClass(); + StreamingVirtualTable table = new StreamingVirtualTable(KS_NAME); + TABLE_NAME = table.toString(); + VirtualKeyspaceRegistry.instance.register(new VirtualKeyspace(KS_NAME, ImmutableList.of(table))); + } + + @Before + public void clearState() + { + StreamManager.instance.clearStates(); + } + + @Test + public void empty() throws Throwable + { + assertEmpty(execute(t("select * from %s"))); + } + + @Test + public void single() throws Throwable + { + StreamingState state = stream(true); + assertRows(execute(t("select id, follower, operation, peers, status, progress_percentage, last_updated_at, failure_cause, success_message from %s")), + new Object[] { state.id(), true, "Repair", Collections.emptyList(), "init", 0F, new Date(state.lastUpdatedAtMillis()), null, null }); + + state.phase.start(); + assertRows(execute(t("select id, follower, operation, peers, status, progress_percentage, last_updated_at, failure_cause, success_message from %s")), + new Object[] { state.id(), true, "Repair", Collections.emptyList(), "start", 0F, new Date(state.lastUpdatedAtMillis()), null, null }); + + state.handleStreamEvent(new StreamEvent.SessionPreparedEvent(state.id(), new SessionInfo(PEER2, 1, PEER1, Collections.emptyList(), Collections.emptyList(), StreamSession.State.PREPARING))); + + state.onSuccess(new StreamState(state.id(), StreamOperation.REPAIR, ImmutableSet.of(new SessionInfo(PEER2, 1, PEER1, Collections.emptyList(), Collections.emptyList(), StreamSession.State.COMPLETE)))); + assertRows(execute(t("select id, follower, operation, peers, status, progress_percentage, last_updated_at, failure_cause, success_message from %s")), + new Object[] { state.id(), true, "Repair", Arrays.asList(address(127, 0, 0, 2).toString()), "success", 100F, new Date(state.lastUpdatedAtMillis()), null, null }); + } + + @Test + public void progressInitiator() throws Throwable + { + progress(false); + } + + @Test + public void progressFollower() throws Throwable + { + progress(true); + } + + public void progress(boolean follower) throws Throwable + { + StreamingState state = stream(follower); + StreamResultFuture future = state.future(); + state.phase.start(); + + SessionInfo s1 = new SessionInfo(PEER2, 0, FBUtilities.getBroadcastAddressAndPort(), Arrays.asList(streamSummary()), Arrays.asList(streamSummary(), streamSummary()), StreamSession.State.PREPARING); + SessionInfo s2 = new SessionInfo(PEER3, 0, FBUtilities.getBroadcastAddressAndPort(), Arrays.asList(streamSummary()), Arrays.asList(streamSummary(), streamSummary()), StreamSession.State.PREPARING); + + state.handleStreamEvent(new StreamEvent.SessionPreparedEvent(state.id(), s1)); + state.handleStreamEvent(new StreamEvent.SessionPreparedEvent(state.id(), s2)); + + long bytesToReceive = 0, bytesToSend = 0; + long filesToReceive = 0, filesToSend = 0; + for (SessionInfo s : Arrays.asList(s1, s2)) + { + bytesToReceive += s.getTotalSizeToReceive(); + bytesToSend += s.getTotalSizeToSend(); + filesToReceive += s.getTotalFilesToReceive(); + filesToSend += s.getTotalFilesToSend(); + } + assertRows(execute(t("select id, follower, peers, status, progress_percentage, bytes_to_receive, bytes_received, bytes_to_send, bytes_sent, files_to_receive, files_received, files_to_send, files_sent from %s")), + new Object[] { state.id(), follower, Arrays.asList(PEER2.toString(), PEER3.toString()), "start", 0F, bytesToReceive, 0L, bytesToSend, 0L, filesToReceive, 0L, filesToSend, 0L }); + + // update progress + long bytesReceived = 0, bytesSent = 0; + for (SessionInfo s : Arrays.asList(s1, s2)) + { + long in = s.getTotalFilesToReceive() - 1; + long inBytes = s.getTotalSizeToReceive() - in; + long out = s.getTotalFilesToSend() - 1; + long outBytes = s.getTotalSizeToSend() - out; + state.handleStreamEvent(new StreamEvent.ProgressEvent(state.id(), new ProgressInfo((InetAddressAndPort) s.peer, 0, "0", ProgressInfo.Direction.IN, inBytes, inBytes))); + state.handleStreamEvent(new StreamEvent.ProgressEvent(state.id(), new ProgressInfo((InetAddressAndPort) s.peer, 0, "0", ProgressInfo.Direction.OUT, outBytes, outBytes))); + bytesReceived += inBytes; + bytesSent += outBytes; + } + + assertRows(execute(t("select id, follower, peers, status, bytes_to_receive, bytes_received, bytes_to_send, bytes_sent, files_to_receive, files_received, files_to_send, files_sent from %s")), + new Object[] { state.id(), follower, Arrays.asList(PEER2.toString(), PEER3.toString()), "start", bytesToReceive, bytesReceived, bytesToSend, bytesSent, filesToReceive, 2L, filesToSend, 2L }); + + // finish + for (SessionInfo s : Arrays.asList(s1, s2)) + { + // complete the rest + for (long i = 1; i < s.getTotalFilesToReceive(); i++) + state.handleStreamEvent(new StreamEvent.ProgressEvent(state.id(), new ProgressInfo((InetAddressAndPort) s.peer, 0, Long.toString(i), ProgressInfo.Direction.IN, 1, 1))); + for (long i = 1; i < s.getTotalFilesToSend(); i++) + state.handleStreamEvent(new StreamEvent.ProgressEvent(state.id(), new ProgressInfo((InetAddressAndPort) s.peer, 0, Long.toString(i), ProgressInfo.Direction.OUT, 1, 1))); + } + + assertRows(execute(t("select id, follower, peers, status, progress_percentage, bytes_to_receive, bytes_received, bytes_to_send, bytes_sent, files_to_receive, files_received, files_to_send, files_sent from %s")), + new Object[] { state.id(), follower, Arrays.asList(PEER2.toString(), PEER3.toString()), "start", 99F, bytesToReceive, bytesToReceive, bytesToSend, bytesToSend, filesToReceive, filesToReceive, filesToSend, filesToSend }); + + state.onSuccess(future.getCurrentState()); + assertRows(execute(t("select id, follower, peers, status, progress_percentage, last_updated_at, failure_cause, success_message from %s")), + new Object[] { state.id(), follower, Arrays.asList(PEER2.toString(), PEER3.toString()), "success", 100F, new Date(state.lastUpdatedAtMillis()), null, null }); + } + + private static StreamSummary streamSummary() + { + int files = ThreadLocalRandom.current().nextInt(2, 10); + return new StreamSummary(TableId.fromUUID(UUID.randomUUID()), files, files * 1024); + } + + @Test + public void failed() throws Throwable + { + StreamingState state = stream(true); + RuntimeException t = new RuntimeException("You failed!"); + state.onFailure(t); + assertRows(execute(t("select id, follower, peers, status, progress_percentage, last_updated_at, failure_cause, success_message from %s")), + new Object[] { state.id(), true, Collections.emptyList(), "failure", 100F, new Date(state.lastUpdatedAtMillis()), Throwables.getStackTrace(t), null }); + } + + private static String t(String query) + { + return String.format(query, TABLE_NAME); + } + + private static StreamingState stream(boolean follower) + { + StreamResultFuture future = new StreamResultFuture(UUID.randomUUID(), StreamOperation.REPAIR, new StreamCoordinator(StreamOperation.REPAIR, 0, StreamingChannel.Factory.Global.streamingFactory(), follower, false, null, null) { + // initiator requires active sessions exist, else the future becomes success right away. + @Override + public synchronized boolean hasActiveSessions() + { + return true; + } + }); + StreamingState state = new StreamingState(future); + if (follower) StreamManager.instance.putFollowerStream(future); + else StreamManager.instance.putInitiatorStream(future); + StreamManager.instance.putStreamingState(state); + future.addEventListener(state); + return state; + } + + private static InetAddressAndPort address(int a, int b, int c, int d) + { + try + { + return InetAddressAndPort.getByAddress(new byte[] {(byte) a, (byte) b, (byte) c, (byte) d}); + } + catch (UnknownHostException e) + { + throw new AssertionError(e); + } + } +} \ No newline at end of file diff --git a/test/unit/org/apache/cassandra/io/util/FileTest.java b/test/unit/org/apache/cassandra/io/util/FileTest.java index 0fb415ab20be..d7340db1ca93 100644 --- a/test/unit/org/apache/cassandra/io/util/FileTest.java +++ b/test/unit/org/apache/cassandra/io/util/FileTest.java @@ -33,6 +33,7 @@ import org.junit.Assert; import org.junit.Test; +import org.apache.cassandra.config.DatabaseDescriptor; import org.psjava.util.Triple; import static java.util.concurrent.TimeUnit.NANOSECONDS; @@ -51,6 +52,9 @@ public class FileTest dir = new java.io.File(parent, dirName); //checkstyle: permit this instantiation dir.mkdirs(); new File(dir).deleteRecursiveOnExit(); + + // PathUtils touches StorageService which touches StreamManager which requires configs be setup + DatabaseDescriptor.daemonInitialization(); } diff --git a/test/unit/org/apache/cassandra/service/AbstractFilesystemOwnershipCheckTest.java b/test/unit/org/apache/cassandra/service/AbstractFilesystemOwnershipCheckTest.java index b70e51436b85..11efc3c2c394 100644 --- a/test/unit/org/apache/cassandra/service/AbstractFilesystemOwnershipCheckTest.java +++ b/test/unit/org/apache/cassandra/service/AbstractFilesystemOwnershipCheckTest.java @@ -29,10 +29,12 @@ import org.junit.After; import org.junit.Assume; +import org.junit.BeforeClass; import org.junit.Ignore; import org.junit.Test; import org.apache.cassandra.config.CassandraRelevantProperties; +import org.apache.cassandra.config.DatabaseDescriptor; import org.apache.cassandra.config.StartupChecksOptions; import org.apache.cassandra.exceptions.StartupException; import org.apache.cassandra.io.util.File; @@ -175,6 +177,13 @@ private void delete(File file) file.delete(); } + @BeforeClass + public static void setupConfig() + { + // PathUtils touches StorageService which touches StreamManager which requires configs be setup + DatabaseDescriptor.daemonInitialization(); + } + @After public void teardown() throws IOException { diff --git a/test/unit/org/apache/cassandra/utils/binlog/BinLogTest.java b/test/unit/org/apache/cassandra/utils/binlog/BinLogTest.java index 86b63a2d425f..5227e51b51dc 100644 --- a/test/unit/org/apache/cassandra/utils/binlog/BinLogTest.java +++ b/test/unit/org/apache/cassandra/utils/binlog/BinLogTest.java @@ -28,9 +28,11 @@ import java.util.concurrent.atomic.AtomicInteger; import java.util.function.Supplier; +import org.apache.cassandra.config.DatabaseDescriptor; import org.apache.cassandra.io.util.File; import org.junit.After; import org.junit.Before; +import org.junit.BeforeClass; import org.junit.Test; import net.openhft.chronicle.queue.ChronicleQueue; @@ -60,6 +62,13 @@ public static Path tempDir() throws Exception private BinLog binLog; private Path path; + @BeforeClass + public static void setup() + { + // PathUtils touches StorageService which touches StreamManager which requires configs be setup + DatabaseDescriptor.daemonInitialization(); + } + @Before public void setUp() throws Exception {