Skip to content

Commit

Permalink
Introduce RocksDbSegmentIdentifier to avoid changing the storege plug… (
Browse files Browse the repository at this point in the history
hyperledger#3755)

* Introduce RocksDbSegmentIdentifier to avoid changing the storege plugin interface

Signed-off-by: Fabio Di Fabio <[email protected]>
  • Loading branch information
fab-10 authored May 4, 2022
1 parent fb25cdb commit 0fdd55d
Show file tree
Hide file tree
Showing 5 changed files with 136 additions and 82 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,75 @@
/*
* Copyright Hyperledger Besu Contributors.
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
* an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
* specific language governing permissions and limitations under the License.
*
* SPDX-License-Identifier: Apache-2.0
*/
package org.hyperledger.besu.plugin.services.storage.rocksdb;

import org.hyperledger.besu.plugin.services.exception.StorageException;

import java.util.Objects;
import java.util.concurrent.atomic.AtomicReference;

import org.rocksdb.ColumnFamilyDescriptor;
import org.rocksdb.ColumnFamilyHandle;
import org.rocksdb.RocksDBException;
import org.rocksdb.TransactionDB;

public class RocksDbSegmentIdentifier {

private final TransactionDB db;
private final AtomicReference<ColumnFamilyHandle> reference;

public RocksDbSegmentIdentifier(
final TransactionDB db, final ColumnFamilyHandle columnFamilyHandle) {
this.db = db;
this.reference = new AtomicReference<>(columnFamilyHandle);
}

public void reset() {
reference.getAndUpdate(
oldHandle -> {
try {
ColumnFamilyDescriptor descriptor =
new ColumnFamilyDescriptor(
oldHandle.getName(), oldHandle.getDescriptor().getOptions());
db.dropColumnFamily(oldHandle);
ColumnFamilyHandle newHandle = db.createColumnFamily(descriptor);
oldHandle.close();
return newHandle;
} catch (final RocksDBException e) {
throw new StorageException(e);
}
});
}

public ColumnFamilyHandle get() {
return reference.get();
}

@Override
public boolean equals(final Object o) {
if (this == o) {
return true;
}
if (o == null || getClass() != o.getClass()) {
return false;
}
RocksDbSegmentIdentifier that = (RocksDbSegmentIdentifier) o;
return Objects.equals(reference.get(), that.reference.get());
}

@Override
public int hashCode() {
return reference.get().hashCode();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
import org.hyperledger.besu.plugin.services.storage.rocksdb.RocksDBMetrics;
import org.hyperledger.besu.plugin.services.storage.rocksdb.RocksDBMetricsFactory;
import org.hyperledger.besu.plugin.services.storage.rocksdb.RocksDbKeyIterator;
import org.hyperledger.besu.plugin.services.storage.rocksdb.RocksDbSegmentIdentifier;
import org.hyperledger.besu.plugin.services.storage.rocksdb.RocksDbUtil;
import org.hyperledger.besu.plugin.services.storage.rocksdb.configuration.RocksDBConfiguration;
import org.hyperledger.besu.services.kvstore.SegmentedKeyValueStorage;
Expand All @@ -36,7 +37,6 @@
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Predicate;
import java.util.stream.Collectors;
import java.util.stream.Stream;
Expand All @@ -61,7 +61,7 @@
import org.slf4j.LoggerFactory;

public class RocksDBColumnarKeyValueStorage
implements SegmentedKeyValueStorage<ColumnFamilyHandle> {
implements SegmentedKeyValueStorage<RocksDbSegmentIdentifier> {

private static final Logger LOG = LoggerFactory.getLogger(RocksDBColumnarKeyValueStorage.class);
private static final String DEFAULT_COLUMN = "default";
Expand All @@ -75,7 +75,7 @@ public class RocksDBColumnarKeyValueStorage
private final TransactionDBOptions txOptions;
private final TransactionDB db;
private final AtomicBoolean closed = new AtomicBoolean(false);
private final Map<String, AtomicReference<ColumnFamilyHandle>> columnHandlesByName;
private final Map<String, RocksDbSegmentIdentifier> columnHandlesByName;
private final RocksDBMetrics metrics;
private final WriteOptions tryDeleteOptions = new WriteOptions().setNoSlowdown(true);

Expand Down Expand Up @@ -128,14 +128,13 @@ public RocksDBColumnarKeyValueStorage(
Collectors.toMap(
segment -> Bytes.wrap(segment.getId()), SegmentIdentifier::getName));

final ImmutableMap.Builder<String, AtomicReference<ColumnFamilyHandle>> builder =
ImmutableMap.builder();
final ImmutableMap.Builder<String, RocksDbSegmentIdentifier> builder = ImmutableMap.builder();

for (ColumnFamilyHandle columnHandle : columnHandles) {
final String segmentName =
requireNonNullElse(
segmentsById.get(Bytes.wrap(columnHandle.getName())), DEFAULT_COLUMN);
builder.put(segmentName, new AtomicReference<>(columnHandle));
builder.put(segmentName, new RocksDbSegmentIdentifier(db, columnHandle));
}
columnHandlesByName = builder.build();

Expand All @@ -150,42 +149,41 @@ private BlockBasedTableConfig createBlockBasedTableConfig(final RocksDBConfigura
}

@Override
public AtomicReference<ColumnFamilyHandle> getSegmentIdentifierByName(
final SegmentIdentifier segment) {
public RocksDbSegmentIdentifier getSegmentIdentifierByName(final SegmentIdentifier segment) {
return columnHandlesByName.get(segment.getName());
}

@Override
public Optional<byte[]> get(final ColumnFamilyHandle segment, final byte[] key)
public Optional<byte[]> get(final RocksDbSegmentIdentifier segment, final byte[] key)
throws StorageException {
throwIfClosed();

try (final OperationTimer.TimingContext ignored = metrics.getReadLatency().startTimer()) {
return Optional.ofNullable(db.get(segment, key));
return Optional.ofNullable(db.get(segment.get(), key));
} catch (final RocksDBException e) {
throw new StorageException(e);
}
}

@Override
public Transaction<ColumnFamilyHandle> startTransaction() throws StorageException {
public Transaction<RocksDbSegmentIdentifier> startTransaction() throws StorageException {
throwIfClosed();
final WriteOptions writeOptions = new WriteOptions();
return new SegmentedKeyValueStorageTransactionTransitionValidatorDecorator<>(
new RocksDbTransaction(db.beginTransaction(writeOptions), writeOptions));
}

@Override
public Stream<byte[]> streamKeys(final ColumnFamilyHandle segmentHandle) {
final RocksIterator rocksIterator = db.newIterator(segmentHandle);
public Stream<byte[]> streamKeys(final RocksDbSegmentIdentifier segmentHandle) {
final RocksIterator rocksIterator = db.newIterator(segmentHandle.get());
rocksIterator.seekToFirst();
return RocksDbKeyIterator.create(rocksIterator).toStream();
}

@Override
public boolean tryDelete(final ColumnFamilyHandle segmentHandle, final byte[] key) {
public boolean tryDelete(final RocksDbSegmentIdentifier segmentHandle, final byte[] key) {
try {
db.delete(segmentHandle, tryDeleteOptions, key);
db.delete(segmentHandle.get(), tryDeleteOptions, key);
return true;
} catch (RocksDBException e) {
if (e.getStatus().getCode() == Status.Code.Incomplete) {
Expand All @@ -198,33 +196,17 @@ public boolean tryDelete(final ColumnFamilyHandle segmentHandle, final byte[] ke

@Override
public Set<byte[]> getAllKeysThat(
final ColumnFamilyHandle segmentHandle, final Predicate<byte[]> returnCondition) {
final RocksDbSegmentIdentifier segmentHandle, final Predicate<byte[]> returnCondition) {
return streamKeys(segmentHandle).filter(returnCondition).collect(toUnmodifiableSet());
}

@Override
public void clear(final ColumnFamilyHandle segmentHandle) {

var entry =
columnHandlesByName.values().stream().filter(e -> e.get().equals(segmentHandle)).findAny();

if (entry.isPresent()) {
AtomicReference<ColumnFamilyHandle> segmentHandleRef = entry.get();
segmentHandleRef.getAndUpdate(
oldHandle -> {
try {
ColumnFamilyDescriptor descriptor =
new ColumnFamilyDescriptor(
segmentHandle.getName(), segmentHandle.getDescriptor().getOptions());
db.dropColumnFamily(oldHandle);
ColumnFamilyHandle newHandle = db.createColumnFamily(descriptor);
segmentHandle.close();
return newHandle;
} catch (final RocksDBException e) {
throw new StorageException(e);
}
});
}
public void clear(final RocksDbSegmentIdentifier segmentHandle) {

columnHandlesByName.values().stream()
.filter(e -> e.equals(segmentHandle))
.findAny()
.ifPresent(segmentIdentifier -> segmentIdentifier.reset());
}

@Override
Expand All @@ -234,7 +216,7 @@ public void close() {
options.close();
tryDeleteOptions.close();
columnHandlesByName.values().stream()
.map(AtomicReference::get)
.map(RocksDbSegmentIdentifier::get)
.forEach(ColumnFamilyHandle::close);
db.close();
}
Expand All @@ -247,7 +229,7 @@ private void throwIfClosed() {
}
}

private class RocksDbTransaction implements Transaction<ColumnFamilyHandle> {
private class RocksDbTransaction implements Transaction<RocksDbSegmentIdentifier> {

private final org.rocksdb.Transaction innerTx;
private final WriteOptions options;
Expand All @@ -258,9 +240,9 @@ private class RocksDbTransaction implements Transaction<ColumnFamilyHandle> {
}

@Override
public void put(final ColumnFamilyHandle segment, final byte[] key, final byte[] value) {
public void put(final RocksDbSegmentIdentifier segment, final byte[] key, final byte[] value) {
try (final OperationTimer.TimingContext ignored = metrics.getWriteLatency().startTimer()) {
innerTx.put(segment, key, value);
innerTx.put(segment.get(), key, value);
} catch (final RocksDBException e) {
if (e.getMessage().contains(NO_SPACE_LEFT_ON_DEVICE)) {
LOG.error(e.getMessage());
Expand All @@ -271,9 +253,9 @@ public void put(final ColumnFamilyHandle segment, final byte[] key, final byte[]
}

@Override
public void remove(final ColumnFamilyHandle segment, final byte[] key) {
public void remove(final RocksDbSegmentIdentifier segment, final byte[] key) {
try (final OperationTimer.TimingContext ignored = metrics.getRemoveLatency().startTimer()) {
innerTx.delete(segment, key);
innerTx.delete(segment.get(), key);
} catch (final RocksDBException e) {
if (e.getMessage().contains(NO_SPACE_LEFT_ON_DEVICE)) {
LOG.error(e.getMessage());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import org.hyperledger.besu.plugin.services.storage.KeyValueStorage;
import org.hyperledger.besu.plugin.services.storage.SegmentIdentifier;
import org.hyperledger.besu.plugin.services.storage.rocksdb.RocksDBMetricsFactory;
import org.hyperledger.besu.plugin.services.storage.rocksdb.RocksDbSegmentIdentifier;
import org.hyperledger.besu.plugin.services.storage.rocksdb.configuration.RocksDBConfigurationBuilder;
import org.hyperledger.besu.plugin.services.storage.rocksdb.segmented.RocksDBColumnarKeyValueStorage;
import org.hyperledger.besu.services.kvstore.SegmentedKeyValueStorage;
Expand All @@ -31,13 +32,11 @@
import java.util.Arrays;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Consumer;

import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.TemporaryFolder;
import org.rocksdb.ColumnFamilyHandle;

public class RocksDBColumnarKeyValueStorageTest extends AbstractKeyValueStorageTest {

Expand All @@ -48,50 +47,49 @@ public void assertClear() throws Exception {
final byte[] key = bytesFromHexString("0001");
final byte[] val1 = bytesFromHexString("0FFF");
final byte[] val2 = bytesFromHexString("1337");
final SegmentedKeyValueStorage<ColumnFamilyHandle> store = createSegmentedStore();
AtomicReference<ColumnFamilyHandle> segment = store.getSegmentIdentifierByName(TestSegment.FOO);
final SegmentedKeyValueStorage<RocksDbSegmentIdentifier> store = createSegmentedStore();
RocksDbSegmentIdentifier segment = store.getSegmentIdentifierByName(TestSegment.FOO);
KeyValueStorage duplicateSegmentRef =
new SegmentedKeyValueStorageAdapter<>(TestSegment.FOO, store);

final Consumer<byte[]> insert =
value -> {
final Transaction<ColumnFamilyHandle> tx = store.startTransaction();
tx.put(segment.get(), key, value);
final Transaction<RocksDbSegmentIdentifier> tx = store.startTransaction();
tx.put(segment, key, value);
tx.commit();
};

// insert val:
insert.accept(val1);
assertThat(store.get(segment.get(), key).orElse(null)).isEqualTo(val1);
assertThat(store.get(segment, key).orElse(null)).isEqualTo(val1);
assertThat(duplicateSegmentRef.get(key).orElse(null)).isEqualTo(val1);

// clear and assert empty:
store.clear(segment.get());
assertThat(store.get(segment.get(), key)).isEmpty();
store.clear(segment);
assertThat(store.get(segment, key)).isEmpty();
assertThat(duplicateSegmentRef.get(key)).isEmpty();

// insert into empty:
insert.accept(val2);
assertThat(store.get(segment.get(), key).orElse(null)).isEqualTo(val2);
assertThat(store.get(segment, key).orElse(null)).isEqualTo(val2);
assertThat(duplicateSegmentRef.get(key).orElse(null)).isEqualTo(val2);

store.close();
}

@Test
public void twoSegmentsAreIndependent() throws Exception {
final SegmentedKeyValueStorage<ColumnFamilyHandle> store = createSegmentedStore();
final SegmentedKeyValueStorage<RocksDbSegmentIdentifier> store = createSegmentedStore();

final Transaction<ColumnFamilyHandle> tx = store.startTransaction();
final Transaction<RocksDbSegmentIdentifier> tx = store.startTransaction();
tx.put(
store.getSegmentIdentifierByName(TestSegment.BAR).get(),
store.getSegmentIdentifierByName(TestSegment.BAR),
bytesFromHexString("0001"),
bytesFromHexString("0FFF"));
tx.commit();

final Optional<byte[]> result =
store.get(
store.getSegmentIdentifierByName(TestSegment.FOO).get(), bytesFromHexString("0001"));
store.get(store.getSegmentIdentifierByName(TestSegment.FOO), bytesFromHexString("0001"));

assertThat(result).isEmpty();

Expand All @@ -103,11 +101,11 @@ public void canRemoveThroughSegmentIteration() throws Exception {
// we're looping this in order to catch intermittent failures when rocksdb objects are not close
// properly
for (int i = 0; i < 50; i++) {
final SegmentedKeyValueStorage<ColumnFamilyHandle> store = createSegmentedStore();
final ColumnFamilyHandle fooSegment = store.getSegmentIdentifierByName(TestSegment.FOO).get();
final ColumnFamilyHandle barSegment = store.getSegmentIdentifierByName(TestSegment.BAR).get();
final SegmentedKeyValueStorage<RocksDbSegmentIdentifier> store = createSegmentedStore();
final RocksDbSegmentIdentifier fooSegment = store.getSegmentIdentifierByName(TestSegment.FOO);
final RocksDbSegmentIdentifier barSegment = store.getSegmentIdentifierByName(TestSegment.BAR);

final Transaction<ColumnFamilyHandle> tx = store.startTransaction();
final Transaction<RocksDbSegmentIdentifier> tx = store.startTransaction();
tx.put(fooSegment, bytesOf(1), bytesOf(1));
tx.put(fooSegment, bytesOf(2), bytesOf(2));
tx.put(fooSegment, bytesOf(3), bytesOf(3));
Expand All @@ -129,7 +127,7 @@ public void canRemoveThroughSegmentIteration() throws Exception {
if (!Arrays.equals(key, bytesOf(4))) store.tryDelete(barSegment, key);
});

for (final ColumnFamilyHandle segment : Set.of(fooSegment, barSegment)) {
for (final RocksDbSegmentIdentifier segment : Set.of(fooSegment, barSegment)) {
assertThat(store.streamKeys(segment).count()).isEqualTo(1);
}

Expand All @@ -147,11 +145,11 @@ public void canRemoveThroughSegmentIteration() throws Exception {

@Test
public void canGetThroughSegmentIteration() throws Exception {
final SegmentedKeyValueStorage<ColumnFamilyHandle> store = createSegmentedStore();
final ColumnFamilyHandle fooSegment = store.getSegmentIdentifierByName(TestSegment.FOO).get();
final ColumnFamilyHandle barSegment = store.getSegmentIdentifierByName(TestSegment.BAR).get();
final SegmentedKeyValueStorage<RocksDbSegmentIdentifier> store = createSegmentedStore();
final RocksDbSegmentIdentifier fooSegment = store.getSegmentIdentifierByName(TestSegment.FOO);
final RocksDbSegmentIdentifier barSegment = store.getSegmentIdentifierByName(TestSegment.BAR);

final Transaction<ColumnFamilyHandle> tx = store.startTransaction();
final Transaction<RocksDbSegmentIdentifier> tx = store.startTransaction();
tx.put(fooSegment, bytesOf(1), bytesOf(1));
tx.put(fooSegment, bytesOf(2), bytesOf(2));
tx.put(fooSegment, bytesOf(3), bytesOf(3));
Expand Down Expand Up @@ -201,7 +199,8 @@ public byte[] getId() {
}
}

private SegmentedKeyValueStorage<ColumnFamilyHandle> createSegmentedStore() throws Exception {
private SegmentedKeyValueStorage<RocksDbSegmentIdentifier> createSegmentedStore()
throws Exception {
return new RocksDBColumnarKeyValueStorage(
new RocksDBConfigurationBuilder().databaseDir(folder.newFolder().toPath()).build(),
Arrays.asList(TestSegment.FOO, TestSegment.BAR),
Expand Down
Loading

0 comments on commit 0fdd55d

Please sign in to comment.