Skip to content

Commit

Permalink
Merge pull request #188 from chainbound/naman/fix/missed-block
Browse files Browse the repository at this point in the history
fix: commitment deadline in case of missed block
  • Loading branch information
mempirate authored Aug 7, 2024
2 parents 949ea17 + 933a1d3 commit 7e2022d
Show file tree
Hide file tree
Showing 3 changed files with 141 additions and 34 deletions.
59 changes: 47 additions & 12 deletions bolt-sidecar/src/driver.rs
Original file line number Diff line number Diff line change
@@ -1,10 +1,16 @@
use core::fmt;
use std::time::{Duration, Instant};

use alloy::{
rpc::types::beacon::events::HeadEvent,
signers::{local::PrivateKeySigner, Signer as SignerECDSA},
};
use beacon_api_client::mainnet::Client as BeaconClient;
use ethereum_consensus::{
clock::{self, SlotStream, SystemTimeProvider},
phase0::mainnet::SLOTS_PER_EPOCH,
};
use futures::StreamExt;
use tokio::sync::mpsc;
use tracing::{debug, error, info, warn};

Expand All @@ -24,7 +30,6 @@ use crate::{
};

/// The driver for the sidecar, responsible for managing the main event loop.
#[derive(Debug)]
pub struct SidecarDriver<C, BLS, ECDSA> {
head_tracker: HeadTracker,
execution: ExecutionState<C>,
Expand All @@ -35,6 +40,24 @@ pub struct SidecarDriver<C, BLS, ECDSA> {
mevboost_client: MevBoostClient,
api_events_rx: mpsc::Receiver<CommitmentEvent>,
payload_requests_rx: mpsc::Receiver<FetchPayloadRequest>,
/// Stream of slots made from the consensus clock
slot_stream: SlotStream<SystemTimeProvider>,
}

impl fmt::Debug for SidecarDriver<StateClient, BlsSigner, PrivateKeySigner> {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
f.debug_struct("SidecarDriver")
.field("head_tracker", &self.head_tracker)
.field("execution", &self.execution)
.field("consensus", &self.consensus)
.field("constraint_signer", &self.constraint_signer)
.field("commitment_signer", &self.commitment_signer)
.field("local_builder", &self.local_builder)
.field("mevboost_client", &self.mevboost_client)
.field("api_events_rx", &self.api_events_rx)
.field("payload_requests_rx", &self.payload_requests_rx)
.finish()
}
}

impl SidecarDriver<StateClient, BlsSigner, PrivateKeySigner> {
Expand Down Expand Up @@ -74,17 +97,27 @@ impl<C: StateFetcher, BLS: SignerBLS, ECDSA: SignerECDSA> SidecarDriver<C, BLS,
let (api_events_tx, api_events_rx) = mpsc::channel(1024);
CommitmentsApiServer::new(api_addr).run(api_events_tx).await;

// TODO: this can be replaced with ethereum_consensus::clock::from_system_time()
// but using beacon node events is easier to work on a custom devnet for now
// (as we don't need to specify genesis time and slot duration)
let head_tracker = HeadTracker::start(beacon_client.clone());

// TODO: head tracker initializes the genesis timestamp with '0' value
// we should add an async fn to fetch the value for safety
// Initialize the consensus clock.
let consensus_clock = clock::from_system_time(
head_tracker.beacon_genesis_timestamp(),
cfg.chain.slot_time(),
SLOTS_PER_EPOCH,
);
let slot_stream = consensus_clock.into_stream();

let consensus = ConsensusState::new(
beacon_client.clone(),
beacon_client,
cfg.validator_indexes.clone(),
cfg.chain.commitment_deadline(),
);

// TODO: this can be replaced with ethereum_consensus::clock::from_system_time()
// but using beacon node events is easier to work on a custom devnet for now
// (as we don't need to specify genesis time and slot duration)
let head_tracker = HeadTracker::start(beacon_client);

let builder_proxy_cfg = BuilderProxyConfig {
mevboost_url: cfg.mevboost_url.clone(),
server_port: cfg.mevboost_proxy_port,
Expand All @@ -110,6 +143,7 @@ impl<C: StateFetcher, BLS: SignerBLS, ECDSA: SignerECDSA> SidecarDriver<C, BLS,
mevboost_client,
api_events_rx,
payload_requests_rx,
slot_stream,
})
}

Expand All @@ -132,6 +166,11 @@ impl<C: StateFetcher, BLS: SignerBLS, ECDSA: SignerECDSA> SidecarDriver<C, BLS,
Some(payload_request) = self.payload_requests_rx.recv() => {
self.handle_fetch_payload_request(payload_request);
}
Some(slot) = self.slot_stream.next() => {
if let Err(e) = self.consensus.update_slot(slot).await {
error!(err = ?e, "Failed to update consensus state slot");
}
}
}
}
}
Expand Down Expand Up @@ -191,7 +230,7 @@ impl<C: StateFetcher, BLS: SignerBLS, ECDSA: SignerECDSA> SidecarDriver<C, BLS,
};
}

/// Handle a new head event, updating the execution and consensus state.
/// Handle a new head event, updating the execution state.
async fn handle_new_head_event(&mut self, head_event: HeadEvent) {
let slot = head_event.slot;
info!(slot, "Received new head event");
Expand All @@ -200,10 +239,6 @@ impl<C: StateFetcher, BLS: SignerBLS, ECDSA: SignerECDSA> SidecarDriver<C, BLS,
if let Err(e) = self.execution.update_head(None, slot).await {
error!(err = ?e, "Failed to update execution state head");
}

if let Err(e) = self.consensus.update_head(slot).await {
error!(err = ?e, "Failed to update consensus state head");
}
}

/// Handle a commitment deadline event, submitting constraints to the MEV-Boost service.
Expand Down
72 changes: 56 additions & 16 deletions bolt-sidecar/src/state/consensus.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,8 @@ use std::{
time::{Duration, Instant},
};

use beacon_api_client::{mainnet::Client, BlockId, ProposerDuty};
use ethereum_consensus::{deneb::BeaconBlockHeader, phase0::mainnet::SLOTS_PER_EPOCH};
use beacon_api_client::{mainnet::Client, ProposerDuty};
use ethereum_consensus::phase0::mainnet::SLOTS_PER_EPOCH;

use super::CommitmentDeadline;
use crate::{
Expand Down Expand Up @@ -41,7 +41,6 @@ pub struct Epoch {
#[allow(missing_debug_implementations)]
pub struct ConsensusState {
beacon_api_client: Client,
header: BeaconBlockHeader,
epoch: Epoch,
validator_indexes: ValidatorIndexes,
// Timestamp of when the latest slot was received
Expand All @@ -62,7 +61,6 @@ pub struct ConsensusState {
impl fmt::Debug for ConsensusState {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
f.debug_struct("ConsensusState")
.field("header", &self.header)
.field("epoch", &self.epoch)
.field("latest_slot", &self.latest_slot)
.field("latest_slot_timestamp", &self.latest_slot_timestamp)
Expand All @@ -81,7 +79,6 @@ impl ConsensusState {
ConsensusState {
beacon_api_client,
validator_indexes,
header: BeaconBlockHeader::default(),
epoch: Epoch::default(),
latest_slot: Default::default(),
latest_slot_timestamp: Instant::now(),
Expand Down Expand Up @@ -117,21 +114,16 @@ impl ConsensusState {
}

/// Update the latest head and fetch the relevant data from the beacon chain.
pub async fn update_head(&mut self, head: u64) -> Result<(), ConsensusError> {
pub async fn update_slot(&mut self, slot: u64) -> Result<(), ConsensusError> {
// Reset the commitment deadline to start counting for the next slot.
self.commitment_deadline =
CommitmentDeadline::new(head + 1, self.commitment_deadline_duration);

let update = self.beacon_api_client.get_beacon_header(BlockId::Slot(head)).await?;

self.header = update.header.message;
CommitmentDeadline::new(slot + 1, self.commitment_deadline_duration);

// Update the timestamp with current time
self.latest_slot_timestamp = Instant::now();
self.latest_slot = head;
self.latest_slot = slot;

// Get the current value of slot and epoch
let slot = self.header.slot;
// Calculate the current value of epoch
let epoch = slot / SLOTS_PER_EPOCH;

// If the epoch has changed, update the proposer duties
Expand Down Expand Up @@ -169,9 +161,12 @@ impl ConsensusState {

#[cfg(test)]
mod tests {
use super::*;
use beacon_api_client::ProposerDuty;
use reqwest::Url;
use tracing::warn;

use super::*;
use crate::test_util::try_get_beacon_api_url;

#[tokio::test]
async fn test_find_validator_index_for_slot() {
Expand All @@ -188,7 +183,6 @@ mod tests {
// Create a ConsensusState with the sample proposer duties and validator indexes
let state = ConsensusState {
beacon_api_client: Client::new(Url::parse("http://localhost").unwrap()),
header: BeaconBlockHeader::default(),
epoch: Epoch { value: 0, start_slot: 0, proposer_duties },
latest_slot_timestamp: Instant::now(),
commitment_deadline: CommitmentDeadline::new(0, Duration::from_secs(1)),
Expand All @@ -207,4 +201,50 @@ mod tests {
Err(ConsensusError::ValidatorNotFound)
));
}

#[tokio::test]
async fn test_update_slot() -> eyre::Result<()> {
let _ = tracing_subscriber::fmt::try_init();

let commitment_deadline_duration = Duration::from_secs(1);
let validator_indexes = ValidatorIndexes::from(vec![100, 101, 102]);

let Some(url) = try_get_beacon_api_url().await else {
warn!("skipping test: beacon API URL is not reachable");
return Ok(());
};

let beacon_client = BeaconClient::new(Url::parse(url).unwrap());

// Create the initial ConsensusState
let mut state = ConsensusState {
beacon_api_client: beacon_client,
epoch: Epoch::default(),
latest_slot: Default::default(),
latest_slot_timestamp: Instant::now(),
validator_indexes,
commitment_deadline: CommitmentDeadline::new(0, commitment_deadline_duration),
commitment_deadline_duration,
};

// Update the slot to 32
state.update_slot(32).await.unwrap();

// Check values were updated correctly
assert_eq!(state.latest_slot, 32);
assert!(state.latest_slot_timestamp.elapsed().as_secs() < 1);
assert_eq!(state.epoch.value, 1);
assert_eq!(state.epoch.start_slot, 32);

// Update the slot to 63, which should not update the epoch
state.update_slot(63).await.unwrap();

// Check values were updated correctly
assert_eq!(state.latest_slot, 63);
assert!(state.latest_slot_timestamp.elapsed().as_secs() < 1);
assert_eq!(state.epoch.value, 1);
assert_eq!(state.epoch.start_slot, 32);

Ok(())
}
}
44 changes: 38 additions & 6 deletions bolt-sidecar/src/state/head_tracker.rs
Original file line number Diff line number Diff line change
@@ -1,13 +1,22 @@
use std::time::Duration;
use std::{
sync::{
atomic::{AtomicU64, Ordering},
Arc,
},
time::Duration,
};

use alloy::rpc::types::beacon::events::HeadEvent;
use beacon_api_client::Topic;
use futures::StreamExt;
use tokio::{sync::broadcast, task::AbortHandle};
use tokio::{sync::broadcast, task::AbortHandle, time::sleep};
use tracing::warn;

use crate::BeaconClient;

/// The delay between retries when attempting to reconnect to the beacon client
const RETRY_DELAY: Duration = Duration::from_secs(1);

/// Simple actor to keep track of the most recent head of the beacon chain
/// and broadcast updates to its subscribers.
///
Expand All @@ -17,6 +26,8 @@ use crate::BeaconClient;
pub struct HeadTracker {
/// Channel to receive updates of the "Head" beacon topic
new_heads_rx: broadcast::Receiver<HeadEvent>,
/// The genesis timestamp of the beacon chain, used for calculating proposal times
beacon_genesis_timestamp: Arc<AtomicU64>,
/// Handle to the background task that listens for new head events.
/// Kept to allow for graceful shutdown.
quit: AbortHandle,
Expand All @@ -38,13 +49,29 @@ impl HeadTracker {
pub fn start(beacon_client: BeaconClient) -> Self {
let (new_heads_tx, new_heads_rx) = broadcast::channel(32);

let beacon_genesis_timestamp = Arc::new(AtomicU64::new(0));
let beacon_genesis_timestamp_clone = beacon_genesis_timestamp.clone();

let task = tokio::spawn(async move {
loop {
// First, try to get the genesis timestamp and cache it.
let genesis_time = loop {
match beacon_client.get_genesis_details().await {
Ok(genesis_info) => break genesis_info.genesis_time,
Err(err) => {
warn!(?err, "failed to get genesis details");
sleep(RETRY_DELAY).await;
continue;
}
}
};
beacon_genesis_timestamp_clone.store(genesis_time, Ordering::Relaxed);

let mut event_stream = match beacon_client.get_events::<NewHeadsTopic>().await {
Ok(events) => events,
Err(err) => {
warn!(?err, "failed to subscribe to new heads topic, retrying...");
tokio::time::sleep(Duration::from_secs(1)).await;
sleep(RETRY_DELAY).await;
continue;
}
};
Expand All @@ -53,12 +80,12 @@ impl HeadTracker {
Some(Ok(event)) => event,
Some(Err(err)) => {
warn!(?err, "error reading new head event stream, retrying...");
tokio::time::sleep(Duration::from_secs(1)).await;
sleep(RETRY_DELAY).await;
continue;
}
None => {
warn!("new head event stream ended, retrying...");
tokio::time::sleep(Duration::from_secs(1)).await;
sleep(RETRY_DELAY).await;
continue;
}
};
Expand All @@ -69,7 +96,7 @@ impl HeadTracker {
}
});

Self { new_heads_rx, quit: task.abort_handle() }
Self { new_heads_rx, beacon_genesis_timestamp, quit: task.abort_handle() }
}

/// Stop the tracker and cleanup resources
Expand All @@ -89,6 +116,11 @@ impl HeadTracker {
pub fn subscribe_new_heads(&self) -> broadcast::Receiver<HeadEvent> {
self.new_heads_rx.resubscribe()
}

/// Get the genesis timestamp of the beacon chain
pub fn beacon_genesis_timestamp(&self) -> u64 {
self.beacon_genesis_timestamp.load(Ordering::Relaxed)
}
}

#[cfg(test)]
Expand Down

0 comments on commit 7e2022d

Please sign in to comment.