Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(primitives) : Chunk Implementation for Parallel Sender Recovery #5224

Closed
wants to merge 12 commits into from
Closed
Prev Previous commit
Next Next commit
Updated reth-primitives src/transactions/mod.rs
  • Loading branch information
Arindam2407 committed Nov 23, 2023
commit 887b69dee91d26ab26b707178b43343c04c87ea4
66 changes: 19 additions & 47 deletions crates/primitives/src/transaction/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,10 +8,10 @@ use alloy_rlp::{
use bytes::{Buf, BytesMut};
use derive_more::{AsRef, Deref};
use once_cell::sync::Lazy;
use rayon::prelude::*;
use rayon::prelude::{IntoParallelIterator, ParallelIterator};
use reth_codecs::{add_arbitrary_tests, derive_arbitrary, Compact};
use serde::{Deserialize, Serialize};
use std::{mem, sync::mpsc, thread};
use std::mem;

pub use access_list::{AccessList, AccessListItem};
pub use eip1559::TxEip1559;
Expand Down Expand Up @@ -833,16 +833,6 @@ impl TransactionSignedNoHash {
self.signature.recover_signer(signature_hash)
}

/// Recover signer helper for parallel impl.
///
/// Returns `None` if the transaction's signature is invalid, see also [Self::recover_signer].
pub fn recover_signer_helper(&self, rlp_buf: &mut Vec<u8>) -> Option<Address> {
let tx = self;
tx.transaction.encode_without_signature(rlp_buf);

tx.signature.recover_signer(keccak256(rlp_buf))
}

/// Converts into a transaction type with its hash: [`TransactionSigned`].
pub fn with_hash(self) -> TransactionSigned {
self.into()
Expand All @@ -862,15 +852,13 @@ impl TransactionSignedNoHash {
let mut recovered_signers: Vec<Address> = Vec::new();
let mut channels = Vec::new();
rayon::scope(|s| {
let (chunk_size, chunks) =
if num_txes < (rayon::current_num_threads() * rayon::current_num_threads()) {
(num_txes, 2)
} else {
let chunk_size = num_txes /
(rayon::current_num_threads() * rayon::current_num_threads());
let chunks = num_txes / chunk_size + 1;
(chunk_size, chunks)
};
let (chunk_size, chunks) = if num_txes < 16 {
(num_txes, 2)
} else {
let chunk_size = num_txes / (num_txes / 16);
let chunks = num_txes / chunk_size + 1;
(chunk_size, chunks)
};
let mut iter = txes.into_iter();
(0..chunks).for_each(|i| {
let chunk: Vec<&TransactionSignedNoHash> = if i == chunks - 1 {
Expand All @@ -884,10 +872,8 @@ impl TransactionSignedNoHash {
// This task will send the results through the channel after it recovered
// the senders.
s.spawn(move |_| {
let mut rlp_buf = Vec::with_capacity(128);
for tx in chunk {
rlp_buf.clear();
let recovery_result = tx.recover_signer_helper(&mut rlp_buf);
let recovery_result = tx.recover_signer();
let _ = recovered_senders_tx.send(recovery_result);
}
});
Expand Down Expand Up @@ -1052,16 +1038,6 @@ impl TransactionSigned {
self.signature.recover_signer(signature_hash)
}

/// Recover signer helper for parallel impl.
///
/// Returns `None` if the transaction's signature is invalid, see also [Self::recover_signer].
pub fn recover_signer_helper(&self, rlp_buf: &mut Vec<u8>) -> Option<Address> {
let tx = self;
tx.transaction.encode_without_signature(rlp_buf);

tx.signature.recover_signer(keccak256(rlp_buf))
}

/// Recovers a list of signers from a transaction list iterator
///
/// Returns `None`, if some transaction's signature is invalid, see also
Expand All @@ -1076,15 +1052,13 @@ impl TransactionSigned {
let mut recovered_signers: Vec<Address> = Vec::new();
let mut channels = Vec::new();
rayon::scope(|s| {
let (chunk_size, chunks) =
if num_txes < (rayon::current_num_threads() * rayon::current_num_threads()) {
(num_txes, 2)
} else {
let chunk_size = num_txes /
(rayon::current_num_threads() * rayon::current_num_threads());
let chunks = num_txes / chunk_size + 1;
(chunk_size, chunks)
};
let (chunk_size, chunks) = if num_txes < 16 {
(num_txes, 2)
} else {
let chunk_size = num_txes / (num_txes / 16);
let chunks = num_txes / chunk_size + 1;
(chunk_size, chunks)
};
let mut iter = txes.into_iter();
(0..chunks).for_each(|i| {
let chunk: Vec<&TransactionSigned> = if i == chunks - 1 {
Expand All @@ -1098,10 +1072,8 @@ impl TransactionSigned {
// This task will send the results through the channel after it recovered
// the senders.
s.spawn(move |_| {
let mut rlp_buf = Vec::with_capacity(128);
for tx in chunk {
rlp_buf.clear();
let recovery_result = tx.recover_signer_helper(&mut rlp_buf);
let recovery_result = tx.recover_signer();
let _ = recovered_senders_tx.send(recovery_result);
}
});
Expand Down Expand Up @@ -1363,7 +1335,7 @@ impl TransactionSigned {
/// To decode EIP-4844 transactions in `eth_sendRawTransaction`, use
/// [PooledTransactionsElement::decode_enveloped].
pub fn decode_enveloped(data: &mut &[u8]) -> alloy_rlp::Result<Self> {
if data.is_empty() {
if data.is_empty() {
return Err(RlpError::InputTooShort)
}

Expand Down