Skip to content

Commit

Permalink
Log full peers list on connect/disconnect (hyperledger#3745)
Browse files Browse the repository at this point in the history
* added logging of EthPeers

* RlpxAgent logging of peers

Signed-off-by: Sally MacFarlane <[email protected]>
  • Loading branch information
macfarla authored Apr 25, 2022
1 parent e01f63b commit c0dafd6
Show file tree
Hide file tree
Showing 12 changed files with 197 additions and 9 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
package org.hyperledger.besu.ethereum.eth.manager;

import static com.google.common.base.Preconditions.checkArgument;
import static org.hyperledger.besu.util.Slf4jLambdaHelper.traceLambda;

import org.hyperledger.besu.datatypes.Hash;
import org.hyperledger.besu.ethereum.core.Difficulty;
Expand Down Expand Up @@ -54,14 +55,15 @@
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Consumer;
import javax.annotation.Nonnull;

import com.google.common.annotations.VisibleForTesting;
import org.apache.tuweni.bytes.Bytes;
import org.apache.tuweni.bytes.Bytes32;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class EthPeer {
public class EthPeer implements Comparable<EthPeer> {
private static final Logger LOG = LoggerFactory.getLogger(EthPeer.class);

private static final int MAX_OUTSTANDING_REQUESTS = 5;
Expand Down Expand Up @@ -395,7 +397,14 @@ public Map<Integer, AtomicInteger> timeoutCounts() {
return reputation.timeoutCounts();
}

public PeerReputation getReputation() {
return reputation;
}

void handleDisconnect() {
traceLambda(
LOG, "handleDisconnect - peer... {}, {}", this::getShortNodeId, this::getReputation);

requestManagers.forEach(
(protocolName, map) -> map.forEach((code, requestManager) -> requestManager.close()));
}
Expand Down Expand Up @@ -522,7 +531,28 @@ public boolean hasSupportForMessage(final int messageCode) {

@Override
public String toString() {
return String.format("Peer %s...", nodeId().toString().substring(0, 20));
return String.format(
"Peer %s... %s, validated? %s, disconnected? %s",
getShortNodeId(), reputation, isFullyValidated(), isDisconnected());
}

@Nonnull
public String getShortNodeId() {
return nodeId().toString().substring(0, 20);
}

@Override
public int compareTo(final @Nonnull EthPeer ethPeer) {
int repCompare = this.reputation.compareTo(ethPeer.reputation);
if (repCompare != 0) return repCompare;

int headStateCompare =
Long.compare(
this.chainHeadState.getBestBlock().getNumber(),
ethPeer.chainHeadState.getBestBlock().getNumber());
if (headStateCompare != 0) return headStateCompare;

return getConnection().getPeerInfo().compareTo(ethPeer.getConnection().getPeerInfo());
}

@FunctionalInterface
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -204,9 +204,15 @@ public interface ConnectCallback {

@Override
public String toString() {
if (connections.isEmpty()) {
return "0 EthPeers {}";
}
final String connectionsList =
connections.values().stream().map(EthPeer::toString).collect(Collectors.joining(","));
return "EthPeers{connections=" + connectionsList + '}';
connections.values().stream()
.sorted()
.map(EthPeer::toString)
.collect(Collectors.joining(", \n"));
return connections.size() + " EthPeers {\n" + connectionsList + '}';
}

private void invokeConnectionCallbacks(final EthPeer peer) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -331,6 +331,7 @@ public void handleNewConnection(final PeerConnection connection) {
} catch (final PeerNotConnected peerNotConnected) {
// Nothing to do.
}
LOG.trace("{}", ethPeers);
}

@Override
Expand All @@ -345,6 +346,7 @@ public void handleDisconnect(
reason,
connection.getPeerInfo(),
ethPeers.peerCount());
LOG.trace("{}", ethPeers);
}

private void handleStatusMessage(final EthPeer peer, final MessageData data) {
Expand Down Expand Up @@ -374,7 +376,7 @@ private void handleStatusMessage(final EthPeer peer, final MessageData data) {
}
} catch (final RLPException e) {
LOG.debug("Unable to parse status message.", e);
// Parsing errors can happen when clients broadcast network ids outside of the int range,
// Parsing errors can happen when clients broadcast network ids outside the int range,
// So just disconnect with "subprotocol" error rather than "breach of protocol".
peer.disconnect(DisconnectReason.SUBPROTOCOL_TRIGGERED);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,11 +24,12 @@
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import javax.annotation.Nonnull;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class PeerReputation {
public class PeerReputation implements Comparable<PeerReputation> {
private static final Logger LOG = LoggerFactory.getLogger(PeerReputation.class);
private static final int TIMEOUT_THRESHOLD = 3;
private static final int USELESS_RESPONSE_THRESHOLD = 5;
Expand All @@ -39,12 +40,20 @@ public class PeerReputation {
new ConcurrentHashMap<>();
private final Queue<Long> uselessResponseTimes = new ConcurrentLinkedQueue<>();

private static final int DEFAULT_SCORE = 100;
private static final int SMALL_ADJUSTMENT = 1;
private static final int LARGE_ADJUSTMENT = 10;

private int score = DEFAULT_SCORE;

public Optional<DisconnectReason> recordRequestTimeout(final int requestCode) {
final int newTimeoutCount = getOrCreateTimeoutCount(requestCode).incrementAndGet();
if (newTimeoutCount >= TIMEOUT_THRESHOLD) {
LOG.debug("Disconnection triggered by repeated timeouts");
score -= LARGE_ADJUSTMENT;
return Optional.of(DisconnectReason.TIMEOUT);
} else {
score -= SMALL_ADJUSTMENT;
return Optional.empty();
}
}
Expand All @@ -67,14 +76,26 @@ public Optional<DisconnectReason> recordUselessResponse(final long timestamp) {
uselessResponseTimes.poll();
}
if (uselessResponseTimes.size() >= USELESS_RESPONSE_THRESHOLD) {
score -= LARGE_ADJUSTMENT;
LOG.debug("Disconnection triggered by exceeding useless response threshold");
return Optional.of(DisconnectReason.USELESS_PEER);
} else {
score -= SMALL_ADJUSTMENT;
return Optional.empty();
}
}

private boolean shouldRemove(final Long timestamp, final long currentTimestamp) {
return timestamp != null && timestamp + USELESS_RESPONSE_WINDOW_IN_MILLIS < currentTimestamp;
}

@Override
public String toString() {
return String.format("PeerReputation " + score);
}

@Override
public int compareTo(final @Nonnull PeerReputation otherReputation) {
return Integer.compare(this.score, otherReputation.score);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -215,5 +215,28 @@ public static EthStatus readFrom(final RLPInput in) {
return new EthStatus(
protocolVersion, networkId, totalDifficulty, bestHash, genesisHash, forkId);
}

@Override
public String toString() {
return "EthStatus{"
+ "protocolVersion="
+ protocolVersion
+ ", networkId="
+ networkId
+ ", totalDifficulty="
+ totalDifficulty
+ ", bestHash="
+ bestHash
+ ", genesisHash="
+ genesisHash
+ ", forkId="
+ forkId
+ '}';
}
}

@Override
public String toString() {
return status().toString();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@
import org.hyperledger.besu.ethereum.p2p.rlpx.connections.PeerConnection;
import org.hyperledger.besu.ethereum.p2p.rlpx.connections.PeerConnection.PeerNotConnected;
import org.hyperledger.besu.ethereum.p2p.rlpx.wire.MessageData;
import org.hyperledger.besu.ethereum.p2p.rlpx.wire.PeerInfo;
import org.hyperledger.besu.ethereum.p2p.rlpx.wire.messages.PingMessage;
import org.hyperledger.besu.plugin.services.permissioning.NodeMessagePermissioningProvider;
import org.hyperledger.besu.testutil.TestClock;
Expand All @@ -47,11 +48,13 @@
import java.util.stream.Collectors;
import java.util.stream.Stream;

import org.apache.tuweni.bytes.Bytes;
import org.junit.Test;

public class EthPeerTest {
private static final BlockDataGenerator gen = new BlockDataGenerator();
private final TestClock clock = new TestClock();
private static final Bytes NODE_ID = Bytes.random(32);

@Test
public void getHeadersStream() throws PeerNotConnected {
Expand Down Expand Up @@ -342,6 +345,22 @@ public void message_permissioning_any_false_permission_preventsMessageFromSendin
verify(peer.getConnection(), times(0)).sendForProtocol(any(), eq(PingMessage.get()));
}

@Test
public void compareTo_withSameNodeId() {
final EthPeer peer1 = createPeerWithPeerInfo(NODE_ID);
final EthPeer peer2 = createPeerWithPeerInfo(NODE_ID);
assertThat(peer1.compareTo(peer2)).isEqualTo(0);
assertThat(peer2.compareTo(peer1)).isEqualTo(0);
}

@Test
public void compareTo_withDifferentNodeId() {
final EthPeer peer1 = createPeerWithPeerInfo(NODE_ID);
final EthPeer peer2 = createPeerWithPeerInfo(Bytes.fromHexString("0x00"));
assertThat(peer1.compareTo(peer2)).isEqualTo(1);
assertThat(peer2.compareTo(peer1)).isEqualTo(-1);
}

private void messageStream(
final ResponseStreamSupplier getStream,
final MessageData targetMessage,
Expand Down Expand Up @@ -431,6 +450,22 @@ private EthPeer createPeer(final List<PeerValidator> peerValidators) {
return createPeer(peerValidators, Collections.emptyList());
}

private EthPeer createPeerWithPeerInfo(final Bytes nodeId) {
final PeerConnection peerConnection = mock(PeerConnection.class);
final Consumer<EthPeer> onPeerReady = (peer) -> {};
// Use a non-eth protocol name to ensure that EthPeer with sub-protocols such as Istanbul
// that extend the sub-protocol work correctly
PeerInfo peerInfo = new PeerInfo(1, "clientId", Collections.emptyList(), 30303, nodeId);
when(peerConnection.getPeerInfo()).thenReturn(peerInfo);
return new EthPeer(
peerConnection,
"foo",
onPeerReady,
Collections.emptyList(),
clock,
Collections.emptyList());
}

private EthPeer createPeer(
final List<PeerValidator> peerValidators,
final List<NodeMessagePermissioningProvider> permissioningProviders) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@
import org.hyperledger.besu.ethereum.p2p.rlpx.connections.PeerConnection.PeerNotConnected;
import org.hyperledger.besu.ethereum.p2p.rlpx.wire.messages.DisconnectMessage.DisconnectReason;

import java.util.Collections;
import java.util.Optional;
import java.util.OptionalLong;
import java.util.concurrent.CancellationException;
Expand Down Expand Up @@ -264,6 +265,18 @@ public void shouldFailRequestWithBusyDisconnectedAssignedPeer() throws Exception
assertRequestFailure(pendingRequest, CancellationException.class);
}

@Test
public void toString_hasExpectedInfo() {
assertThat(ethPeers.toString()).isEqualTo("0 EthPeers {}");

final EthPeer peerA =
EthProtocolManagerTestUtil.createPeer(ethProtocolManager, Difficulty.of(50), 20)
.getEthPeer();
ethPeers.registerConnection(peerA.getConnection(), Collections.emptyList());
assertThat(ethPeers.toString()).contains("1 EthPeers {");
assertThat(ethPeers.toString()).contains(peerA.getShortNodeId());
}

private void freeUpCapacity(final EthPeer ethPeer) {
ethPeers.dispatchMessage(ethPeer, new EthMessage(ethPeer, NodeDataMessage.create(emptyList())));
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -90,6 +90,25 @@ public void serializeDeserializeWithForkId() {
assertThat(copy.forkId()).isEqualTo(forkId);
}

@Test
public void toStringHasExpectedInfo() {
final int version = EthProtocol.EthVersion.V64;
final BigInteger networkId = BigInteger.ONE;
final Difficulty td = Difficulty.of(1000L);
final Hash bestHash = randHash(1L);
final Hash genesisHash = randHash(2L);
final ForkId forkId = new ForkId(Bytes.fromHexString("0xa00bc334"), 0L);

final MessageData msg =
StatusMessage.create(version, networkId, td, bestHash, genesisHash, forkId);

final StatusMessage copy = new StatusMessage(msg.getData());
final String copyToString = copy.toString();

assertThat(copyToString).contains("bestHash=" + bestHash);
assertThat(copyToString).contains("genesisHash=" + genesisHash);
}

private Hash randHash(final long seed) {
final Random random = new Random(seed);
final byte[] bytes = new byte[32];
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
import static com.google.common.base.Preconditions.checkNotNull;
import static com.google.common.base.Preconditions.checkState;
import static java.util.Objects.isNull;
import static org.hyperledger.besu.util.Slf4jLambdaHelper.traceLambda;

import org.hyperledger.besu.crypto.NodeKey;
import org.hyperledger.besu.crypto.SECPPublicKey;
Expand Down Expand Up @@ -180,6 +181,14 @@ public void connect(final Stream<? extends Peer> peerStream) {
.forEach(this::connect);
}

private String logConnectionsByIdToString() {
final String connectionsList =
connectionsById.values().stream()
.map(RlpxConnection::toString)
.collect(Collectors.joining(",\n"));
return connectionsById.size() + " ConnectionsById {\n" + connectionsList + "}";
}

public void disconnect(final Bytes peerId, final DisconnectReason reason) {
final RlpxConnection connection = connectionsById.remove(peerId);
if (connection != null) {
Expand Down Expand Up @@ -227,7 +236,7 @@ public CompletableFuture<PeerConnection> connect(final Peer peer) {
// Check max peers
if (!peerPrivileges.canExceedConnectionLimits(peer) && getConnectionCount() >= maxConnections) {
final String errorMsg =
"Max peer peer connections established ("
"Max peer connections established ("
+ maxConnections
+ "). Cannot connect to peer: "
+ peer;
Expand Down Expand Up @@ -263,6 +272,8 @@ public CompletableFuture<PeerConnection> connect(final Peer peer) {
}
});

traceLambda(LOG, "{}", this::logConnectionsByIdToString);

return connectionFuture.get();
}

Expand All @@ -276,6 +287,7 @@ private void handleDisconnect(
final PeerConnection peerConnection,
final DisconnectReason disconnectReason,
final boolean initiatedByPeer) {
traceLambda(LOG, "{}", this::logConnectionsByIdToString);
cleanUpPeerConnection(peerConnection.getPeer().getId());
}

Expand Down Expand Up @@ -416,6 +428,7 @@ && getConnectionCount() >= maxConnections) {
// Check remote connections again to control for race conditions
enforceRemoteConnectionLimits();
enforceConnectionLimits();
traceLambda(LOG, "{}", this::logConnectionsByIdToString);
}

private boolean shouldLimitRemoteConnections() {
Expand Down Expand Up @@ -518,6 +531,7 @@ private int compareDuplicateConnections(final RlpxConnection a, final RlpxConnec

final Bytes peerId = a.getPeer().getId();
final Bytes localId = localNode.getPeer().getId();
// at this point a.Id == b.Id
if (a.initiatedRemotely() != b.initiatedRemotely()) {
// If we have connections initiated in different directions, keep the connection initiated
// by the node with the lower id
Expand All @@ -528,6 +542,7 @@ private int compareDuplicateConnections(final RlpxConnection a, final RlpxConnec
}
}
// Otherwise, keep older connection
LOG.info("comparing timestamps " + a.getInitiatedAt() + " with " + b.getInitiatedAt());
return Math.toIntExact(a.getInitiatedAt() - b.getInitiatedAt());
}

Expand Down
Loading

0 comments on commit c0dafd6

Please sign in to comment.