Skip to content

Commit

Permalink
refactor codebase for artemis
Browse files Browse the repository at this point in the history
  • Loading branch information
mouseless-eth committed Jul 2, 2023
1 parent fbf2cf1 commit beefd8f
Show file tree
Hide file tree
Showing 80 changed files with 1,481 additions and 5,457 deletions.
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -6,3 +6,4 @@ contract/cache
output.log
__TEMP__*
run-*.json
*.cfmms-checkpoint.json
7 changes: 2 additions & 5 deletions bot/.env.example
Original file line number Diff line number Diff line change
@@ -1,9 +1,6 @@
RPC_URL_WSS=ws://localhost:8545
WSS_RPC=ws://localhost:8545
SEARCHER_PRIVATE_KEY=0000000000000000000000000000000000000000000000000000000000000001
FLASHBOTS_AUTH_KEY=0000000000000000000000000000000000000000000000000000000000000002
SANDWICH_CONTRACT=0xaAaAaAaaAaAaAaaAaAAAAAAAAaaaAaAaAaaAaaAa
INTERVAL_BLOCK_NEW_POOL=50
V2_ALERT_DISCORD_WEBHOOK=...
V3_ALERT_DISCORD_WEBHOOK=...
POISON_ALERT_DISCORD_WEBHOOK=...
DISCORD_WEBHOOK=...
SANDWICH_INCEPTION_BLOCK=...
47 changes: 7 additions & 40 deletions bot/Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,41 +1,8 @@
[package]
name = "rusty-sando"
version = "0.1.0"
edition = "2021"
license = "MIT"
description = "Optimized sandwich bot written using Rust and Huff"
readme = "README.md"
homepage = "https://github.com/mouseless-eth/rusty-sando"
repository = "https://github.com/mouseless-eth/rusty-sando"
keywords = ["Ethereum", "Mev", "Dex", "Sandwich"]
authors = ["0xmouseless <https://github.com/mouseless-eth>"]
[workspace]
members = [
"crates/artemis-core",
"crates/strategy",
"sando-bin"
]

[dependencies]
ethers-flashbots = { git = "https://github.com/onbjerg/ethers-flashbots" }
ethers = {version = "2.0.0", features = ["abigen", "ws"]}
revm = {version = "3.0.0", features = ["ethersdb", "serde", "std"]}
dotenv = "0.15.0"
hashbrown = "0.14.0"
tokio = { version = "1", features = ["full"] }
log = "0.4.17"
url = "2.3.1"
dashmap = "5.4.0"
async-recursion = "1.0.2"
hex = "0.4.3"
serde = "1.0.145"
eyre = "0.6.8"
reqwest = "0.11.12"
time = "*"
indoc = "2"
indicatif = "0.17.1"
thiserror = "1.0.37"
fern = {version = "0.6", features = ["colored"]}
chrono = "0.4.23"
futures = "0.3.5"
colored = "2.0.0"

[profile.release]
debug = true

[dev-dependencies]
tokio-test = "*"
default-member = "sando-bin"
24 changes: 24 additions & 0 deletions bot/crates/artemis-core/Cargo.toml
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
[package]
name = "artemis-core"
version = "0.1.0"
edition = "2021"

# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html

[dependencies]

## eth
ethers = { version = "2", features = ["ws", "rustls"]}
ethers-flashbots = { git = "https://github.com/onbjerg/ethers-flashbots", features = ["rustls"] }

## async
async-trait = "0.1.64"
futures = "0.3"
reqwest = { version = "0.11.14", default-features = false, features = ["rustls-tls"] }
tokio = { version = "1.18", features = ["full"] }
tokio-stream = { version = "0.1", features = ['sync'] }

## misc
anyhow = "1.0.70"
thiserror = "1.0.40"
tracing = "0.1.37"
57 changes: 57 additions & 0 deletions bot/crates/artemis-core/src/collectors/block_collector.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,57 @@
use crate::types::{Collector, CollectorStream};
use anyhow::Result;
use async_trait::async_trait;
use ethers::{
prelude::Middleware,
providers::PubsubClient,
types::{U256, U64},
};
use std::sync::Arc;
use tokio_stream::StreamExt;

/// A collector that listens for new blocks, and generates a stream of
/// [events](NewBlock) which contain the block number and hash.
pub struct BlockCollector<M> {
provider: Arc<M>,
}

/// A new block event, containing the block number and hash.
#[derive(Debug, Clone)]
pub struct NewBlock {
pub number: U64,
pub gas_used: U256,
pub gas_limit: U256,
pub base_fee_per_gas: U256,
pub timestamp: U256,
}

impl<M> BlockCollector<M> {
pub fn new(provider: Arc<M>) -> Self {
Self { provider }
}
}

/// Implementation of the [Collector](Collector) trait for the [BlockCollector](BlockCollector).
/// This implementation uses the [PubsubClient](PubsubClient) to subscribe to new blocks.
#[async_trait]
impl<M> Collector<NewBlock> for BlockCollector<M>
where
M: Middleware,
M::Provider: PubsubClient,
M::Error: 'static,
{
async fn get_event_stream(&self) -> Result<CollectorStream<'_, NewBlock>> {
let stream = self.provider.subscribe_blocks().await?;
let stream = stream.filter_map(|block| match block.number {
Some(number) => Some(NewBlock {
number,
gas_limit: block.gas_used,
gas_used: block.gas_limit,
base_fee_per_gas: block.base_fee_per_gas.unwrap_or_default(),
timestamp: block.timestamp,
}),
None => None,
});
Ok(Box::pin(stream))
}
}
38 changes: 38 additions & 0 deletions bot/crates/artemis-core/src/collectors/mempool_collector.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
use async_trait::async_trait;

use ethers::{prelude::Middleware, providers::PubsubClient, types::Transaction};
use std::sync::Arc;

use crate::types::{Collector, CollectorStream};
use anyhow::Result;

/// A collector that listens for new transactions in the mempool, and generates a stream of
/// [events](Transaction) which contain the transaction.
pub struct MempoolCollector<M> {
provider: Arc<M>,
}

impl<M> MempoolCollector<M> {
pub fn new(provider: Arc<M>) -> Self {
Self { provider }
}
}

/// Implementation of the [Collector](Collector) trait for the [MempoolCollector](MempoolCollector).
/// This implementation uses the [PubsubClient](PubsubClient) to subscribe to new transactions.
#[async_trait]
impl<M> Collector<Transaction> for MempoolCollector<M>
where
M: Middleware,
M::Provider: PubsubClient,
M::Error: 'static,
{
async fn get_event_stream(&self) -> Result<CollectorStream<'_, Transaction>> {
let stream = self
.provider
.subscribe(["newPendingTransactionsWithBody"])
.await
.map_err(|_| anyhow::anyhow!("Failed to create mempool stream"))?;
Ok(Box::pin(stream))
}
}
9 changes: 9 additions & 0 deletions bot/crates/artemis-core/src/collectors/mod.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
//! Collectors are responsible for collecting data from external sources and
//! turning them into internal events. For example, a collector might listen to
//! a stream of new blocks, and turn them into a stream of `NewBlock` events.

/// This collector listens to a stream of new blocks.
pub mod block_collector;

/// This collector listens to a stream of new pending transactions.
pub mod mempool_collector;
124 changes: 124 additions & 0 deletions bot/crates/artemis-core/src/engine.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,124 @@
use tokio::sync::broadcast::{self, Sender};
use tokio::task::JoinSet;
use tokio_stream::StreamExt;
use tracing::{error, info};

use crate::types::{Collector, Executor, Strategy};

/// The main engine of Artemis. This struct is responsible for orchestrating the
/// data flow between collectors, strategies, and executors.
pub struct Engine<E, A> {
/// The set of collectors that the engine will use to collect events.
collectors: Vec<Box<dyn Collector<E>>>,

/// The set of strategies that the engine will use to process events.
strategies: Vec<Box<dyn Strategy<E, A>>>,

/// The set of executors that the engine will use to execute actions.
executors: Vec<Box<dyn Executor<A>>>,
}

impl<E, A> Engine<E, A> {
pub fn new() -> Self {
Self {
collectors: vec![],
strategies: vec![],
executors: vec![],
}
}
}

impl<E, A> Default for Engine<E, A> {
fn default() -> Self {
Self::new()
}
}

impl<E, A> Engine<E, A>
where
E: Send + Clone + 'static + std::fmt::Debug,
A: Send + Clone + 'static + std::fmt::Debug,
{
/// Adds a collector to be used by the engine.
pub fn add_collector(&mut self, collector: Box<dyn Collector<E>>) {
self.collectors.push(collector);
}

/// Adds a strategy to be used by the engine.
pub fn add_strategy(&mut self, strategy: Box<dyn Strategy<E, A>>) {
self.strategies.push(strategy);
}

/// Adds an executor to be used by the engine.
pub fn add_executor(&mut self, executor: Box<dyn Executor<A>>) {
self.executors.push(executor);
}

/// The core run loop of the engine. This function will spawn a thread for
/// each collector, strategy, and executor. It will then orchestrate the
/// data flow between them.
pub async fn run(self) -> Result<JoinSet<()>, Box<dyn std::error::Error>> {
let (event_sender, _): (Sender<E>, _) = broadcast::channel(512);
let (action_sender, _): (Sender<A>, _) = broadcast::channel(512);

let mut set = JoinSet::new();

// Spawn executors in separate threads.
for executor in self.executors {
let mut receiver = action_sender.subscribe();
set.spawn(async move {
info!("starting executor... ");
loop {
match receiver.recv().await {
Ok(action) => match executor.execute(action).await {
Ok(_) => {}
Err(e) => error!("error executing action: {}", e),
},
Err(e) => error!("error receiving action: {}", e),
}
}
});
}

// Spawn strategies in separate threads.
for mut strategy in self.strategies {
let mut event_receiver = event_sender.subscribe();
let action_sender = action_sender.clone();
strategy.sync_state().await?;

set.spawn(async move {
info!("starting strategy... ");
loop {
match event_receiver.recv().await {
Ok(event) => {
if let Some(action) = strategy.process_event(event).await {
match action_sender.send(action) {
Ok(_) => {}
Err(e) => error!("error sending action: {}", e),
}
}
}
Err(e) => error!("error receiving event: {}", e),
}
}
});
}

// Spawn collectors in separate threads.
for collector in self.collectors {
let event_sender = event_sender.clone();
set.spawn(async move {
info!("starting collector... ");
let mut event_stream = collector.get_event_stream().await.unwrap();
while let Some(event) = event_stream.next().await {
match event_sender.send(event) {
Ok(_) => {}
Err(e) => error!("error sending event: {}", e),
}
}
});
}

Ok(set)
}
}
76 changes: 76 additions & 0 deletions bot/crates/artemis-core/src/executors/flashbots_executor.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,76 @@
use std::sync::Arc;

use anyhow::Result;
use async_trait::async_trait;
use ethers::{
providers::Middleware, signers::Signer, types::transaction::eip2718::TypedTransaction,
};
use ethers_flashbots::{BundleRequest, FlashbotsMiddleware};
use reqwest::Url;
use tracing::error;

use crate::types::Executor;

/// A Flashbots executor that sends transactions to the Flashbots relay.
pub struct FlashbotsExecutor<M, S> {
/// The Flashbots middleware.
fb_client: FlashbotsMiddleware<Arc<M>, S>,

/// The signer to sign transactions before sending to the relay.
tx_signer: S,
}

/// A bundle of transactions to send to the Flashbots relay.
pub type FlashbotsBundle = Vec<TypedTransaction>;

impl<M: Middleware, S: Signer> FlashbotsExecutor<M, S> {
pub fn new(client: Arc<M>, tx_signer: S, relay_signer: S, relay_url: impl Into<Url>) -> Self {
let fb_client = FlashbotsMiddleware::new(client, relay_url, relay_signer);
Self {
fb_client,
tx_signer,
}
}
}

#[async_trait]
impl<M, S> Executor<FlashbotsBundle> for FlashbotsExecutor<M, S>
where
M: Middleware + 'static,
M::Error: 'static,
S: Signer + 'static,
{
/// Send a bundle to transactions to the Flashbots relay.
async fn execute(&self, action: FlashbotsBundle) -> Result<()> {
// Add txs to bundle.
let mut bundle = BundleRequest::new();

// Sign each transaction in bundle.
for tx in action {
let signature = self.tx_signer.sign_transaction(&tx).await?;
bundle.add_transaction(tx.rlp_signed(&signature));
}

// Simulate bundle.
let block_number = self.fb_client.get_block_number().await?;
let bundle = bundle
.set_block(block_number + 1)
.set_simulation_block(block_number)
.set_simulation_timestamp(0);

let simulated_bundle = self.fb_client.simulate_bundle(&bundle).await;

if let Err(simulate_error) = simulated_bundle {
error!("Error simulating bundle: {:?}", simulate_error);
}

// Send bundle.
let pending_bundle = self.fb_client.send_bundle(&bundle).await;

if let Err(send_error) = pending_bundle {
error!("Error sending bundle: {:?}", send_error);
}

Ok(())
}
}
Loading

0 comments on commit beefd8f

Please sign in to comment.