Skip to content

Commit

Permalink
Allow in-process clients to bypass the network
Browse files Browse the repository at this point in the history
  • Loading branch information
Ralith committed Aug 12, 2024
1 parent e3434fd commit b80d16c
Show file tree
Hide file tree
Showing 2 changed files with 105 additions and 25 deletions.
8 changes: 4 additions & 4 deletions common/src/proto.rs
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ pub struct CharacterState {
pub orientation: na::UnitQuaternion<f32>,
}

#[derive(Debug, Serialize, Deserialize)]
#[derive(Debug, Serialize, Deserialize, Clone)]
pub struct Spawns {
pub step: Step,
pub spawns: Vec<(EntityId, Vec<Component>)>,
Expand Down Expand Up @@ -83,21 +83,21 @@ pub struct BlockUpdate {
pub consumed_entity: Option<EntityId>,
}

#[derive(Debug, Serialize, Deserialize)]
#[derive(Debug, Serialize, Deserialize, Clone)]
pub struct SerializedVoxelData {
/// Dense 3D array of 16-bit material tags for all voxels in this chunk
pub inner: Vec<u8>,
}

#[derive(Debug, Serialize, Deserialize)]
#[derive(Debug, Serialize, Deserialize, Clone)]
pub enum Component {
Character(Character),
Position(Position),
Material(Material),
Inventory(Inventory),
}

#[derive(Debug, Serialize, Deserialize)]
#[derive(Debug, Serialize, Deserialize, Clone)]
pub struct FreshNode {
/// The side joining the new node to `parent`
pub side: dodeca::Side,
Expand Down
122 changes: 101 additions & 21 deletions server/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ mod sim;

use std::{net::UdpSocket, sync::Arc, time::Instant};

use anyhow::{Context, Error, Result};
use anyhow::{anyhow, Context, Error, Result};
use hecs::Entity;
use quinn::rustls::pki_types::{CertificateDer, PrivateKeyDer};
use slotmap::DenseSlotMap;
Expand Down Expand Up @@ -38,6 +38,8 @@ pub struct Server {

new_clients_send: mpsc::UnboundedSender<(quinn::Connection, proto::ClientHello)>,
new_clients_recv: mpsc::UnboundedReceiver<(quinn::Connection, proto::ClientHello)>,
client_events_send: mpsc::Sender<(ClientId, ClientEvent)>,
client_events_recv: mpsc::Receiver<(ClientId, ClientEvent)>,
}

impl Server {
Expand All @@ -55,6 +57,7 @@ impl Server {
info!(address = %endpoint.local_addr().unwrap(), "listening");

let (new_clients_send, new_clients_recv) = mpsc::unbounded_channel();
let (client_events_send, client_events_recv) = mpsc::channel(128);

let cfg = Arc::new(cfg);
Ok(Self {
Expand All @@ -66,19 +69,74 @@ impl Server {

new_clients_send,
new_clients_recv,
client_events_send,
client_events_recv,
})
}

pub fn connect(&mut self, hello: proto::ClientHello, mut backend: HandleBackend) -> Result<()> {
let snapshot = Arc::new(self.sim.snapshot());
let (id, entity) = self
.sim
.activate_or_spawn_character(&hello)
.ok_or_else(|| anyhow!("could not spawn {} due to name conflict", hello.name))?;
let (ordered_send, mut ordered_recv) = mpsc::channel(32);
ordered_send.try_send(snapshot).unwrap();
let (unordered_send, mut unordered_recv) = mpsc::channel(32);
let client_id = self.clients.insert(Client {
conn: None,
character: entity,
ordered: ordered_send,
unordered: unordered_send,
latest_input_received: 0,
latest_input_processed: 0,
inputs: InputQueue::new(),
});

backend
.incoming
.send(Message::Hello(proto::ServerHello {
character: id,
sim_config: (*self.cfg).clone(),
}))
.unwrap();

// Adapt channels. TODO: Make this unnecessary.
let client_events_send = self.client_events_send.clone();
tokio::spawn(async move {
while let Some(msg) = backend.outgoing.recv().await {
_ = client_events_send
.send((client_id, ClientEvent::Command(msg)))
.await;
}
});
tokio::spawn({
let incoming_send = backend.incoming.clone();
async move {
while let Some(msg) = ordered_recv.recv().await {
_ = incoming_send.send(Message::Spawns(proto::Spawns::clone(&*msg)));
}
}
});
tokio::spawn(async move {
while let Some(msg) = unordered_recv.recv().await {
_ = backend.incoming.send(Message::StateDelta(msg));
}
});

info!(id = ?client_id.0, "connected locally");
Ok(())
}

pub async fn run(mut self) {
let mut ticks = tokio::time::interval(self.cfg.step_interval);
let mut incoming = self.handle_incoming();
let (client_events_send, mut client_events) = mpsc::channel(128);
loop {
tokio::select! {
_ = ticks.tick() => { self.on_step(); },
Some(conn) = incoming.recv() => { self.on_connect(conn); }
Some((id, event)) = client_events.recv() => { self.on_client_event(id, event); }
Some((conn, hello)) = self.new_clients_recv.recv() => { self.on_client(conn, hello, client_events_send.clone()); }
Some((id, event)) = self.client_events_recv.recv() => { self.on_client_event(id, event); }
Some((conn, hello)) = self.new_clients_recv.recv() => { self.on_client(conn, hello); }
}
}
}
Expand Down Expand Up @@ -139,12 +197,19 @@ impl Server {
}
}
for client_id in overran {
error!("dropping slow client {:?}", client_id.0);
self.clients[client_id].conn.close(
connection_error_codes::STREAM_ERROR,
b"client reading too slowly",
);
self.cleanup_client(client_id);
match self.clients[client_id].conn {
Some(ref conn) => {
error!("dropping slow client {:?}", client_id.0);
conn.close(
connection_error_codes::STREAM_ERROR,
b"client reading too slowly",
);
self.cleanup_client(client_id);
}
None => {
warn!("slow local client {:?}", client_id.0);
}
}
}

// Save the world. Could be less frequent if it becomes a bottleneck.
Expand All @@ -163,9 +228,6 @@ impl Server {
match event {
ClientEvent::Lost(e) => {
error!("lost: {:#}", e);
client
.conn
.close(connection_error_codes::CONNECTION_LOST, b"");
self.cleanup_client(client_id);
}
ClientEvent::Command(cmd) => {
Expand Down Expand Up @@ -202,12 +264,7 @@ impl Server {
});
}

fn on_client(
&mut self,
connection: quinn::Connection,
hello: proto::ClientHello,
mut send: mpsc::Sender<(ClientId, ClientEvent)>,
) {
fn on_client(&mut self, connection: quinn::Connection, hello: proto::ClientHello) {
let snapshot = Arc::new(self.sim.snapshot());
let Some((id, entity)) = self.sim.activate_or_spawn_character(&hello) else {
error!("could not spawn {} due to name conflict", hello.name);
Expand All @@ -218,7 +275,7 @@ impl Server {
ordered_send.try_send(snapshot).unwrap();
let (unordered_send, unordered_recv) = mpsc::channel(32);
let client_id = self.clients.insert(Client {
conn: connection.clone(),
conn: Some(connection.clone()),
character: entity,
ordered: ordered_send,
unordered: unordered_send,
Expand All @@ -238,6 +295,7 @@ impl Server {
let _ = drive_send(connection, server_hello, unordered_recv, ordered_recv).await;
}
});
let mut send = self.client_events_send.clone();
tokio::spawn(async move {
if let Err(e) = drive_recv(client_id, connection, &mut send).await {
// drive_recv returns an error when any connection-terminating issue occurs, so we
Expand Down Expand Up @@ -322,7 +380,7 @@ slotmap::new_key_type! {
}

struct Client {
conn: quinn::Connection,
conn: Option<quinn::Connection>,
character: Entity,
ordered: mpsc::Sender<Ordered>,
unordered: mpsc::Sender<Unordered>,
Expand All @@ -346,6 +404,28 @@ pub struct Handle {
pub outgoing: mpsc::UnboundedSender<proto::Command>,
}

impl Handle {
pub fn loopback() -> (Self, HandleBackend) {
let (incoming_send, incoming_recv) = mpsc::unbounded_channel();
let (outgoing_send, outgoing_recv) = mpsc::unbounded_channel();
(
Self {
incoming: incoming_recv,
outgoing: outgoing_send,
},
HandleBackend {
incoming: incoming_send,
outgoing: outgoing_recv,
},
)
}
}

pub struct HandleBackend {
incoming: mpsc::UnboundedSender<Message>,
outgoing: mpsc::UnboundedReceiver<proto::Command>,
}

#[derive(Debug)]
pub enum Message {
Hello(proto::ServerHello),
Expand Down

0 comments on commit b80d16c

Please sign in to comment.