Skip to content

Commit

Permalink
add terminate
Browse files Browse the repository at this point in the history
  • Loading branch information
yiyuanliu committed Dec 28, 2021
1 parent 1e61af4 commit de5bd83
Showing 1 changed file with 32 additions and 3 deletions.
35 changes: 32 additions & 3 deletions madsim-std/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,9 +4,12 @@ use std::{
io,
net::{SocketAddr, ToSocketAddrs},
sync::{mpsc, Arc, Mutex},
thread::JoinHandle,
time::Duration,
};

use tokio::sync::mpsc::UnboundedSender;

mod context;
pub mod fs;
pub mod net;
Expand Down Expand Up @@ -62,6 +65,7 @@ impl Runtime {
let local_handle = LocalHandle {
handle: rt.handle().clone(),
net: handle.net.create_host(&rt, &local, "127.0.0.1:0").unwrap(),
term: None,
};
Runtime {
rt,
Expand Down Expand Up @@ -161,9 +165,24 @@ impl Handle {
pub struct LocalHandle {
handle: tokio::runtime::Handle,
net: net::NetLocalHandle,
term: Option<TermHandle>,
}

#[derive(Clone)]
struct TermHandle {
tx: UnboundedSender<()>,
join: Arc<Mutex<Option<JoinHandle<()>>>>,
}

impl LocalHandle {
/// SIGTERM, can only call once.
pub async fn terminate(&mut self) {
self.term.as_mut().map(|term| {
term.tx.send(()).unwrap();
term.join.lock().unwrap().take().unwrap().join().unwrap();
});
}

/// Spawn a future onto the runtime.
pub fn spawn<F>(&self, future: F) -> task::Task<F::Output>
where
Expand All @@ -186,18 +205,28 @@ impl LocalHandle {
// create a channel to receive the local_handle from the new thread
let (sender, recver) = mpsc::channel();
let handle = handle.clone();
std::thread::spawn(move || {
let (kill_tx, mut kill_rx) = tokio::sync::mpsc::unbounded_channel();
let join = std::thread::spawn(move || {
let local = tokio::task::LocalSet::new();
let local_handle = LocalHandle {
handle: rt.handle().clone(),
net: handle.net.create_host(&rt, &local, addr).unwrap(),
term: Some(TermHandle {
tx: kill_tx,
join: Arc::new(Mutex::new(None)),
}),
};
sender.send(local_handle.clone()).ok().unwrap();

let _guard = crate::context::enter_local(local_handle);
local.block_on(&rt, futures::future::pending::<()>());
local.block_on(&rt, async move {
kill_rx.recv().await;
});
});
let mut handle = recver.recv().unwrap();
handle.term.as_mut().map(|term| {
term.join = Arc::new(Mutex::new(Some(join)));
});
let handle = recver.recv().unwrap();
Ok(handle)
}
}
Expand Down

0 comments on commit de5bd83

Please sign in to comment.