Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

V4 leak fix #288

Merged
merged 12 commits into from
Apr 14, 2021
Next Next commit
Fix ScheduleTimer thread leak. See issue #287 on notify
  • Loading branch information
Awfa committed Mar 17, 2021
commit 8ff030a16a62104bcbc8c080b27d5ef5de6b6456
125 changes: 55 additions & 70 deletions src/debounce/timer.rs
Original file line number Diff line number Diff line change
@@ -1,15 +1,12 @@
use super::super::{op, DebouncedEvent};

use std::collections::VecDeque;
use std::ops::DerefMut;
use std::path::PathBuf;
use std::sync::mpsc;
use std::sync::{
atomic::{self, AtomicBool},
Arc, Condvar, Mutex,
};
use std::sync::{Arc, Condvar, Mutex};
use std::thread;
use std::time::{Duration, Instant};
use std::{collections::VecDeque, sync::MutexGuard};
use std::{ops::DerefMut, thread::JoinHandle};

use debounce::{OperationsBuffer, OperationsBufferInner};

Expand All @@ -20,45 +17,53 @@ struct ScheduledEvent {
path: PathBuf,
}

#[derive(Default)]
struct WorkerSharedState {
is_stopped: bool,
events: VecDeque<ScheduledEvent>,
}

struct ScheduleWorker {
new_event_trigger: Arc<Condvar>,
stop_trigger: Arc<Condvar>,
events: Arc<Mutex<VecDeque<ScheduledEvent>>>,
state: Arc<(Mutex<WorkerSharedState>, Condvar)>,
tx: mpsc::Sender<DebouncedEvent>,
operations_buffer: OperationsBuffer,
stopped: Arc<AtomicBool>,
}

impl ScheduleWorker {
fn fire_due_events(&self, now: Instant) -> Option<Instant> {
fn fire_due_events<'a>(
&'a self,
now: Instant,
state: MutexGuard<'a, WorkerSharedState>,
) -> (Option<Instant>, MutexGuard<'a, WorkerSharedState>) {
// simple deadlock avoidance loop.
let (mut events, mut op_buf) = loop {
let events = self.events.lock().unwrap();
let mut state = Some(state);
let (mut state, mut op_buf) = loop {
let state = state.take().unwrap_or_else(|| self.state.0.lock().unwrap());

// To avoid deadlock, we do a `try_lock`, and on `WouldBlock`, we unlock the
// events Mutex, and retry after yielding.
match self.operations_buffer.try_lock() {
Ok(op_buf) => break (events, op_buf),
Err(::std::sync::TryLockError::Poisoned { .. }) => return None,
Ok(op_buf) => break (state, op_buf),
Err(::std::sync::TryLockError::Poisoned { .. }) => return (None, state),
Err(::std::sync::TryLockError::WouldBlock) => {
// drop the lock before yielding to give other threads a chance to complete
// their work.
drop(events);
drop(state);
::std::thread::yield_now();
}
}
};
while let Some(event) = events.pop_front() {
while let Some(event) = state.events.pop_front() {
if event.when <= now {
self.fire_event(event, &mut op_buf)
} else {
// not due yet, put it back
let next_when = event.when;
events.push_front(event);
return Some(next_when);
state.events.push_front(event);
return (Some(next_when), state);
}
}
None
(None, state)
}

fn fire_event(
Expand Down Expand Up @@ -97,44 +102,32 @@ impl ScheduleWorker {
}

fn run(&mut self) {
let m = Mutex::new(());

// Unwrapping is safe because the mutex can't be poisoned,
// since we just created it.
let mut g = m.lock().unwrap();

let mut state = self.state.0.lock().unwrap();
loop {
let now = Instant::now();
let next_when = self.fire_due_events(now);
let (next_when, state_out) = self.fire_due_events(now, state);
state = state_out;

if self.stopped.load(atomic::Ordering::SeqCst) {
if state.is_stopped {
break;
}

// Unwrapping is safe because the mutex can't be poisoned,
// since we haven't shared it with another thread.
g = if let Some(next_when) = next_when {
state = if let Some(next_when) = next_when {
// wait for stop notification or timeout to send next event
self.stop_trigger
.wait_timeout(g, next_when - now)
.unwrap()
.0
self.state.1.wait_timeout(state, next_when - now).unwrap().0
} else {
// no pending events
// wait for new event, to check when it should be send and then wait to send it
self.new_event_trigger.wait(g).unwrap()
self.state.1.wait(state).unwrap()
};
}
}
}

pub struct WatchTimer {
counter: u64,
new_event_trigger: Arc<Condvar>,
stop_trigger: Arc<Condvar>,
state: Arc<(Mutex<WorkerSharedState>, Condvar)>,
delay: Duration,
events: Arc<Mutex<VecDeque<ScheduledEvent>>>,
stopped: Arc<AtomicBool>,
}

impl WatchTimer {
Expand All @@ -143,64 +136,56 @@ impl WatchTimer {
operations_buffer: OperationsBuffer,
delay: Duration,
) -> WatchTimer {
let events = Arc::new(Mutex::new(VecDeque::new()));
let new_event_trigger = Arc::new(Condvar::new());
let stop_trigger = Arc::new(Condvar::new());
let stopped = Arc::new(AtomicBool::new(false));

let worker_new_event_trigger = new_event_trigger.clone();
let worker_stop_trigger = stop_trigger.clone();
let worker_events = events.clone();
let worker_stopped = stopped.clone();
let state = Arc::new((Mutex::new(WorkerSharedState::default()), Condvar::new()));

let worker_state = state.clone();
thread::spawn(move || {
ScheduleWorker {
new_event_trigger: worker_new_event_trigger,
stop_trigger: worker_stop_trigger,
events: worker_events,
state: worker_state,
tx,
operations_buffer,
stopped: worker_stopped,
}
.run();
});

WatchTimer {
counter: 0,
new_event_trigger,
stop_trigger,
state,
delay,
events,
stopped,
}
}

pub fn schedule(&mut self, path: PathBuf) -> u64 {
self.counter = self.counter.wrapping_add(1);

self.events.lock().unwrap().push_back(ScheduledEvent {
id: self.counter,
when: Instant::now() + self.delay,
path,
});

self.new_event_trigger.notify_one();
{
let mut state = self.state.0.lock().unwrap();
state.events.push_back(ScheduledEvent {
id: self.counter,
when: Instant::now() + self.delay,
path,
});
}
self.state.1.notify_one();

self.counter
}

pub fn ignore(&self, id: u64) {
let mut events = self.events.lock().unwrap();
let index = events.iter().rposition(|e| e.id == id);
let mut state = self.state.0.lock().unwrap();
let index = state.events.iter().rposition(|e| e.id == id);
if let Some(index) = index {
events.remove(index);
state.events.remove(index);
}
}
}

impl Drop for WatchTimer {
fn drop(&mut self) {
self.stopped.store(true, atomic::Ordering::SeqCst);
self.stop_trigger.notify_one();
self.new_event_trigger.notify_one();
{
let mut state = self.state.0.lock().unwrap();
state.is_stopped = true;
}
self.state.1.notify_one();
}
}