Skip to content

Commit

Permalink
Fix a concurrency bug in the block processor - apply the same solutio…
Browse files Browse the repository at this point in the history
…n to the validator list updater.
  • Loading branch information
kukabi committed Apr 6, 2022
1 parent 2e0adf3 commit eb0584a
Show file tree
Hide file tree
Showing 4 changed files with 42 additions and 27 deletions.
8 changes: 4 additions & 4 deletions .github/workflows/rust_checks.yml
Original file line number Diff line number Diff line change
Expand Up @@ -33,9 +33,9 @@ jobs:
components: clippy, rustfmt
override: true
- run: rustup update
- run: rustup update nightly-2022-03-14
- run: rustup default nightly-2022-03-14
- run: rustup target add wasm32-unknown-unknown --toolchain nightly-2022-03-14
- run: rustup update nightly #-2022-03-14
- run: rustup default nightly #-2022-03-14
- run: rustup target add wasm32-unknown-unknown --toolchain nightly #-2022-03-14
- run: rustup default stable
- name: Format
uses: actions-rs/cargo@v1
Expand All @@ -45,7 +45,7 @@ jobs:
- name: Clippy
uses: actions-rs/cargo@v1
with:
toolchain: nightly-2022-03-14
toolchain: nightly #-2022-03-14
command: clippy
args: --all-targets -- -D warnings -W clippy::cognitive_complexity
- name: Test
Expand Down
35 changes: 22 additions & 13 deletions subvt-block-processor/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ mod metrics;

lazy_static! {
static ref CONFIG: Config = Config::default();
static ref IS_BUSY: AtomicBool = AtomicBool::new(false);
}

#[derive(Default)]
Expand Down Expand Up @@ -359,6 +360,15 @@ impl Service for BlockProcessor {

async fn run(&'static self) -> anyhow::Result<()> {
loop {
if IS_BUSY.load(Ordering::SeqCst) {
let delay_seconds = CONFIG.common.recovery_retry_seconds;
log::warn!(
"Busy processing past blocks. Hold re-instantiation for {} seconds.",
delay_seconds
);
tokio::time::sleep(std::time::Duration::from_secs(delay_seconds)).await;
continue;
}
let error_cell: Arc<OnceCell<anyhow::Error>> = Arc::new(OnceCell::new());
let block_subscription_substrate_client = SubstrateClient::new(&CONFIG).await?;
let block_processor_substrate_client =
Expand All @@ -367,7 +377,6 @@ impl Service for BlockProcessor {
let postgres = Arc::new(
PostgreSQLNetworkStorage::new(&CONFIG, CONFIG.get_network_postgres_url()).await?,
);
let is_indexing_past_blocks = Arc::new(AtomicBool::new(false));

block_subscription_substrate_client.subscribe_to_finalized_blocks(
CONFIG.substrate.request_timeout_seconds,
Expand All @@ -384,26 +393,27 @@ impl Service for BlockProcessor {
}
};
metrics::target_finalized_block_number().set(finalized_block_number as i64);
if IS_BUSY.load(Ordering::SeqCst) {
log::debug!("Busy processing past blocks. Skip block #{} for now.", finalized_block_number);
return Ok(());
}

let block_processor_substrate_client = block_processor_substrate_client.clone();
let runtime_information = runtime_information.clone();
let postgres = postgres.clone();
if is_indexing_past_blocks.load(Ordering::SeqCst) {
log::debug!("Busy indexing past blocks. Skip block #{} for now.", finalized_block_number);
return Ok(());
}
let is_indexing_past_blocks = Arc::clone(&is_indexing_past_blocks);
IS_BUSY.store(true, Ordering::SeqCst);
tokio::spawn(async move {
let mut block_processor_substrate_client = block_processor_substrate_client.lock().await;
let processed_block_height = match postgres.get_processed_block_height().await {
Ok(processed_block_height) => processed_block_height,
Err(error) => {
log::error!("Cannot get processed block height from the database: {:?}", error);
let _ = error_cell.set(error);
IS_BUSY.store(false, Ordering::SeqCst);
return;
}
};
if processed_block_height < (finalized_block_number - 1) {
is_indexing_past_blocks.store(true, Ordering::SeqCst);
let mut block_number = std::cmp::max(
processed_block_height,
CONFIG.block_processor.start_block_number
Expand All @@ -415,15 +425,15 @@ impl Service for BlockProcessor {
finalized_block_number
);
let start = std::time::Instant::now();
let update_result = self.process_block(
let process_result = self.process_block(
&mut block_processor_substrate_client,
&runtime_information,
&postgres,
block_number,
false,
).await;
metrics::block_processing_time_ms().observe(start.elapsed().as_millis() as f64);
match update_result {
match process_result {
Ok(_) => {
metrics::processed_finalized_block_number().set(block_number as i64);
block_number += 1
Expand All @@ -434,13 +444,11 @@ impl Service for BlockProcessor {
"History block processing failed for block #{}.",
block_number,
);
is_indexing_past_blocks.store(false, Ordering::SeqCst);
let _ = error_cell.set(error);
return;
break;
}
}
}
is_indexing_past_blocks.store(false, Ordering::SeqCst);
} else {
// update current era reward points every 3 minutes
let blocks_per_3_minutes = 3 * 60 * 1000
Expand Down Expand Up @@ -472,6 +480,7 @@ impl Service for BlockProcessor {
}
}
}
IS_BUSY.store(false, Ordering::SeqCst);
});
Ok(())
}).await;
Expand All @@ -480,7 +489,7 @@ impl Service for BlockProcessor {
"Finalized block subscription exited. Will refresh connection and subscription after {} seconds.",
delay_seconds
);
std::thread::sleep(std::time::Duration::from_secs(delay_seconds));
tokio::time::sleep(std::time::Duration::from_secs(delay_seconds)).await;
}
}
}
2 changes: 1 addition & 1 deletion subvt-persistence/src/postgres/network/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -272,7 +272,7 @@ impl PostgreSQLNetworkStorage {
&self,
era_index: u32,
session_index: u64,
validator_account_ids: &Vec<&AccountId>,
validator_account_ids: &[&AccountId],
) -> anyhow::Result<()> {
let mut transaction = self.connection_pool.begin().await?;
for validator_account_id in validator_account_ids {
Expand Down
24 changes: 15 additions & 9 deletions subvt-validator-list-updater/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ mod metrics;

lazy_static! {
static ref CONFIG: Config = Config::default();
static ref IS_BUSY: AtomicBool = AtomicBool::new(false);
}

const HISTORY_BLOCK_DEPTH: u64 = 3;
Expand Down Expand Up @@ -246,9 +247,7 @@ impl ValidatorListUpdater {
Ok(validators)
}

async fn store_processed_block_numbers(
processed_block_numbers: &Vec<u64>,
) -> anyhow::Result<()> {
async fn store_processed_block_numbers(processed_block_numbers: &[u64]) -> anyhow::Result<()> {
let redis_client = redis::Client::open(CONFIG.redis.url.as_str())?;
let mut redis_connection = redis_client.get_async_connection().await.context(format!(
"Cannot connect to Redis at URL {}.",
Expand Down Expand Up @@ -305,12 +304,20 @@ impl Service for ValidatorListUpdater {

async fn run(&'static self) -> anyhow::Result<()> {
loop {
if IS_BUSY.load(Ordering::SeqCst) {
let delay_seconds = CONFIG.common.recovery_retry_seconds;
log::warn!(
"Busy processing past state. Hold re-instantiation for {} seconds.",
delay_seconds
);
tokio::time::sleep(std::time::Duration::from_secs(delay_seconds)).await;
continue;
}
let error_cell: Arc<OnceCell<anyhow::Error>> = Arc::new(OnceCell::new());
let postgres = Arc::new(
PostgreSQLNetworkStorage::new(&CONFIG, CONFIG.get_network_postgres_url()).await?,
);
let substrate_client = Arc::new(SubstrateClient::new(&CONFIG).await?);
let is_busy = Arc::new(AtomicBool::new(false));
let processed_block_numbers: Arc<RwLock<Vec<u64>>> = Arc::new(RwLock::new(
ValidatorListUpdater::fetch_processed_block_numbers().await?,
));
Expand All @@ -329,15 +336,14 @@ impl Service for ValidatorListUpdater {
}
};
metrics::target_finalized_block_number().set(finalized_block_number as i64);
if is_busy.load(Ordering::SeqCst) {
if IS_BUSY.load(Ordering::SeqCst) {
log::debug!("Busy processing a past block. Skip block #{}.", finalized_block_number);
return Ok(());
}
is_busy.store(true, Ordering::SeqCst);
IS_BUSY.store(true, Ordering::SeqCst);
let processed_block_numbers = processed_block_numbers.clone();
let substrate_client = Arc::clone(&substrate_client);
let postgres = postgres.clone();
let is_busy = Arc::clone(&is_busy);
tokio::spawn(async move {
let start = std::time::Instant::now();
let update_result = ValidatorListUpdater::fetch_and_update_validator_list(
Expand All @@ -357,7 +363,7 @@ impl Service for ValidatorListUpdater {
metrics::processing_time_ms().observe(start.elapsed().as_millis() as f64);
metrics::processed_finalized_block_number().set(finalized_block_number as i64);
}
is_busy.store(false, Ordering::SeqCst);
IS_BUSY.store(false, Ordering::SeqCst);
});
Ok(())
}).await;
Expand All @@ -366,7 +372,7 @@ impl Service for ValidatorListUpdater {
"New block subscription exited. Will refresh connection and subscription after {} seconds.",
delay_seconds
);
std::thread::sleep(std::time::Duration::from_secs(delay_seconds));
tokio::time::sleep(std::time::Duration::from_secs(delay_seconds)).await;
}
}
}

0 comments on commit eb0584a

Please sign in to comment.