diff --git a/.github/workflows/rust_checks.yml b/.github/workflows/rust_checks.yml index f0fa048f..514fc5a0 100644 --- a/.github/workflows/rust_checks.yml +++ b/.github/workflows/rust_checks.yml @@ -33,9 +33,9 @@ jobs: components: clippy, rustfmt override: true - run: rustup update - - run: rustup update nightly-2022-03-14 - - run: rustup default nightly-2022-03-14 - - run: rustup target add wasm32-unknown-unknown --toolchain nightly-2022-03-14 + - run: rustup update nightly #-2022-03-14 + - run: rustup default nightly #-2022-03-14 + - run: rustup target add wasm32-unknown-unknown --toolchain nightly #-2022-03-14 - run: rustup default stable - name: Format uses: actions-rs/cargo@v1 @@ -45,7 +45,7 @@ jobs: - name: Clippy uses: actions-rs/cargo@v1 with: - toolchain: nightly-2022-03-14 + toolchain: nightly #-2022-03-14 command: clippy args: --all-targets -- -D warnings -W clippy::cognitive_complexity - name: Test diff --git a/subvt-block-processor/src/lib.rs b/subvt-block-processor/src/lib.rs index 806ec290..b82ccafc 100644 --- a/subvt-block-processor/src/lib.rs +++ b/subvt-block-processor/src/lib.rs @@ -26,6 +26,7 @@ mod metrics; lazy_static! { static ref CONFIG: Config = Config::default(); + static ref IS_BUSY: AtomicBool = AtomicBool::new(false); } #[derive(Default)] @@ -359,6 +360,15 @@ impl Service for BlockProcessor { async fn run(&'static self) -> anyhow::Result<()> { loop { + if IS_BUSY.load(Ordering::SeqCst) { + let delay_seconds = CONFIG.common.recovery_retry_seconds; + log::warn!( + "Busy processing past blocks. Hold re-instantiation for {} seconds.", + delay_seconds + ); + tokio::time::sleep(std::time::Duration::from_secs(delay_seconds)).await; + continue; + } let error_cell: Arc> = Arc::new(OnceCell::new()); let block_subscription_substrate_client = SubstrateClient::new(&CONFIG).await?; let block_processor_substrate_client = @@ -367,7 +377,6 @@ impl Service for BlockProcessor { let postgres = Arc::new( PostgreSQLNetworkStorage::new(&CONFIG, CONFIG.get_network_postgres_url()).await?, ); - let is_indexing_past_blocks = Arc::new(AtomicBool::new(false)); block_subscription_substrate_client.subscribe_to_finalized_blocks( CONFIG.substrate.request_timeout_seconds, @@ -384,14 +393,15 @@ impl Service for BlockProcessor { } }; metrics::target_finalized_block_number().set(finalized_block_number as i64); + if IS_BUSY.load(Ordering::SeqCst) { + log::debug!("Busy processing past blocks. Skip block #{} for now.", finalized_block_number); + return Ok(()); + } + let block_processor_substrate_client = block_processor_substrate_client.clone(); let runtime_information = runtime_information.clone(); let postgres = postgres.clone(); - if is_indexing_past_blocks.load(Ordering::SeqCst) { - log::debug!("Busy indexing past blocks. Skip block #{} for now.", finalized_block_number); - return Ok(()); - } - let is_indexing_past_blocks = Arc::clone(&is_indexing_past_blocks); + IS_BUSY.store(true, Ordering::SeqCst); tokio::spawn(async move { let mut block_processor_substrate_client = block_processor_substrate_client.lock().await; let processed_block_height = match postgres.get_processed_block_height().await { @@ -399,11 +409,11 @@ impl Service for BlockProcessor { Err(error) => { log::error!("Cannot get processed block height from the database: {:?}", error); let _ = error_cell.set(error); + IS_BUSY.store(false, Ordering::SeqCst); return; } }; if processed_block_height < (finalized_block_number - 1) { - is_indexing_past_blocks.store(true, Ordering::SeqCst); let mut block_number = std::cmp::max( processed_block_height, CONFIG.block_processor.start_block_number @@ -415,7 +425,7 @@ impl Service for BlockProcessor { finalized_block_number ); let start = std::time::Instant::now(); - let update_result = self.process_block( + let process_result = self.process_block( &mut block_processor_substrate_client, &runtime_information, &postgres, @@ -423,7 +433,7 @@ impl Service for BlockProcessor { false, ).await; metrics::block_processing_time_ms().observe(start.elapsed().as_millis() as f64); - match update_result { + match process_result { Ok(_) => { metrics::processed_finalized_block_number().set(block_number as i64); block_number += 1 @@ -434,13 +444,11 @@ impl Service for BlockProcessor { "History block processing failed for block #{}.", block_number, ); - is_indexing_past_blocks.store(false, Ordering::SeqCst); let _ = error_cell.set(error); - return; + break; } } } - is_indexing_past_blocks.store(false, Ordering::SeqCst); } else { // update current era reward points every 3 minutes let blocks_per_3_minutes = 3 * 60 * 1000 @@ -472,6 +480,7 @@ impl Service for BlockProcessor { } } } + IS_BUSY.store(false, Ordering::SeqCst); }); Ok(()) }).await; @@ -480,7 +489,7 @@ impl Service for BlockProcessor { "Finalized block subscription exited. Will refresh connection and subscription after {} seconds.", delay_seconds ); - std::thread::sleep(std::time::Duration::from_secs(delay_seconds)); + tokio::time::sleep(std::time::Duration::from_secs(delay_seconds)).await; } } } diff --git a/subvt-persistence/src/postgres/network/mod.rs b/subvt-persistence/src/postgres/network/mod.rs index f0e6f8d6..95de308c 100644 --- a/subvt-persistence/src/postgres/network/mod.rs +++ b/subvt-persistence/src/postgres/network/mod.rs @@ -272,7 +272,7 @@ impl PostgreSQLNetworkStorage { &self, era_index: u32, session_index: u64, - validator_account_ids: &Vec<&AccountId>, + validator_account_ids: &[&AccountId], ) -> anyhow::Result<()> { let mut transaction = self.connection_pool.begin().await?; for validator_account_id in validator_account_ids { diff --git a/subvt-validator-list-updater/src/lib.rs b/subvt-validator-list-updater/src/lib.rs index 27b8c03c..76039cba 100644 --- a/subvt-validator-list-updater/src/lib.rs +++ b/subvt-validator-list-updater/src/lib.rs @@ -23,6 +23,7 @@ mod metrics; lazy_static! { static ref CONFIG: Config = Config::default(); + static ref IS_BUSY: AtomicBool = AtomicBool::new(false); } const HISTORY_BLOCK_DEPTH: u64 = 3; @@ -246,9 +247,7 @@ impl ValidatorListUpdater { Ok(validators) } - async fn store_processed_block_numbers( - processed_block_numbers: &Vec, - ) -> anyhow::Result<()> { + async fn store_processed_block_numbers(processed_block_numbers: &[u64]) -> anyhow::Result<()> { let redis_client = redis::Client::open(CONFIG.redis.url.as_str())?; let mut redis_connection = redis_client.get_async_connection().await.context(format!( "Cannot connect to Redis at URL {}.", @@ -305,12 +304,20 @@ impl Service for ValidatorListUpdater { async fn run(&'static self) -> anyhow::Result<()> { loop { + if IS_BUSY.load(Ordering::SeqCst) { + let delay_seconds = CONFIG.common.recovery_retry_seconds; + log::warn!( + "Busy processing past state. Hold re-instantiation for {} seconds.", + delay_seconds + ); + tokio::time::sleep(std::time::Duration::from_secs(delay_seconds)).await; + continue; + } let error_cell: Arc> = Arc::new(OnceCell::new()); let postgres = Arc::new( PostgreSQLNetworkStorage::new(&CONFIG, CONFIG.get_network_postgres_url()).await?, ); let substrate_client = Arc::new(SubstrateClient::new(&CONFIG).await?); - let is_busy = Arc::new(AtomicBool::new(false)); let processed_block_numbers: Arc>> = Arc::new(RwLock::new( ValidatorListUpdater::fetch_processed_block_numbers().await?, )); @@ -329,15 +336,14 @@ impl Service for ValidatorListUpdater { } }; metrics::target_finalized_block_number().set(finalized_block_number as i64); - if is_busy.load(Ordering::SeqCst) { + if IS_BUSY.load(Ordering::SeqCst) { log::debug!("Busy processing a past block. Skip block #{}.", finalized_block_number); return Ok(()); } - is_busy.store(true, Ordering::SeqCst); + IS_BUSY.store(true, Ordering::SeqCst); let processed_block_numbers = processed_block_numbers.clone(); let substrate_client = Arc::clone(&substrate_client); let postgres = postgres.clone(); - let is_busy = Arc::clone(&is_busy); tokio::spawn(async move { let start = std::time::Instant::now(); let update_result = ValidatorListUpdater::fetch_and_update_validator_list( @@ -357,7 +363,7 @@ impl Service for ValidatorListUpdater { metrics::processing_time_ms().observe(start.elapsed().as_millis() as f64); metrics::processed_finalized_block_number().set(finalized_block_number as i64); } - is_busy.store(false, Ordering::SeqCst); + IS_BUSY.store(false, Ordering::SeqCst); }); Ok(()) }).await; @@ -366,7 +372,7 @@ impl Service for ValidatorListUpdater { "New block subscription exited. Will refresh connection and subscription after {} seconds.", delay_seconds ); - std::thread::sleep(std::time::Duration::from_secs(delay_seconds)); + tokio::time::sleep(std::time::Duration::from_secs(delay_seconds)).await; } } }