Skip to content

Commit

Permalink
Refactor debounced API
Browse files Browse the repository at this point in the history
- Move debounce::Event to lib.rs and rename it to DebouncedEvent
- Rename Event to RawEvent
- Rename Watcher::new to Watcher::new_raw and Watcher::debounced to Watcher::new_debounced
  • Loading branch information
dfaust committed Oct 18, 2016
1 parent 8d98175 commit e1e1203
Show file tree
Hide file tree
Showing 18 changed files with 458 additions and 390 deletions.
10 changes: 6 additions & 4 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ fn watch() -> notify::Result<()> {

// Automatically select the best implementation for your platform.
// You can also access each implementation directly e.g. INotifyWatcher.
let mut watcher: RecommendedWatcher = try!(Watcher::new(tx));
let mut watcher: RecommendedWatcher = try!(Watcher::new_raw(tx));

// Add a path to be watched. All files and directories at that path and
// below will be monitored for changes.
Expand All @@ -43,7 +43,7 @@ fn watch() -> notify::Result<()> {
// for example to handle I/O.
loop {
match rx.recv() {
Ok(notify::Event{ path: Some(path),op:Ok(op) }) => {
Ok(notify::RawEvent{ path: Some(path),op:Ok(op) }) => {
println!("{:?} {:?}", op, path);
},
Err(e) => println!("watch error {}", e),
Expand All @@ -63,9 +63,11 @@ fn main() {

### From v2.x to v3.x

* `notify` now provides two APIs, a _raw_ and a _debounced_ API. In order to keep the old behavior, use the _raw_ API.
Replace every occurrence of `Watcher::new` with `Watcher::new_raw` and `Event` with `RawEvent`. Or see the docs for how to use the _debounced_ API.
* The watch(..) function used to watch a file or a directory now takes an additional argument.
In order to use that argument you first need to import notify::RecursiveMode via the `use` keyword.
To keep the old behavior, use RecursiveMode::Recursive, for more information see the docs.
In order to use that argument you first need to import `RecursiveMode` via the `use` keyword.
To keep the old behavior, use `RecursiveMode::Recursive`, for more information see the docs.
* The inotify back-end used to add watches recursively to a directory but it wouldn't remove them recursively.
From v3.0.0 on inotify removes watches recursively if they were added recursively.
* The inotify back-end didn't use to watch newly created directories.
Expand Down
4 changes: 2 additions & 2 deletions examples/debounced.rs → examples/monitor_debounced.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ fn watch<P: AsRef<Path>>(path: P) -> notify::Result<()> {

// Automatically select the best implementation for your platform.
// You can also access each implementation directly e.g. INotifyWatcher.
let mut watcher: RecommendedWatcher = try!(Watcher::debounced(tx, Duration::from_secs(2)));
let mut watcher: RecommendedWatcher = try!(Watcher::new_debounced(tx, Duration::from_secs(2)));

// Add a path to be watched. All files and directories at that path and
// below will be monitored for changes.
Expand All @@ -22,7 +22,7 @@ fn watch<P: AsRef<Path>>(path: P) -> notify::Result<()> {
loop {
match rx.recv() {
Ok(event) => println!("{:?}", event),
Err(e) => println!("watch error: {}", e),
Err(e) => println!("watch error: {:?}", e),
}
}
}
Expand Down
8 changes: 4 additions & 4 deletions examples/monitor.rs → examples/monitor_raw.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ fn watch<P: AsRef<Path>>(path: P) -> notify::Result<()> {

// Automatically select the best implementation for your platform.
// You can also access each implementation directly e.g. INotifyWatcher.
let mut watcher: RecommendedWatcher = try!(Watcher::new(tx));
let mut watcher: RecommendedWatcher = try!(Watcher::new_raw(tx));

// Add a path to be watched. All files and directories at that path and
// below will be monitored for changes.
Expand All @@ -20,9 +20,9 @@ fn watch<P: AsRef<Path>>(path: P) -> notify::Result<()> {
// for example to handle I/O.
loop {
match rx.recv() {
Ok(notify::Event{path: Some(path), op: Ok(op), cookie}) => println!("{:?} {:?} ({:?})", op, path, cookie),
Ok(event) => println!("broken event: {:?}", event),
Err(e) => println!("watch error: {:?}", e),
Ok(notify::RawEvent{path: Some(path), op: Ok(op), cookie}) => println!("{:?} {:?} ({:?})", op, path, cookie),
Ok(event) => println!("broken event: {:?}", event),
Err(e) => println!("watch error: {:?}", e),
}
}
}
Expand Down
91 changes: 17 additions & 74 deletions src/debounce/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@

mod timer;

use super::{op, Error, Event as NotifyEvent};
use super::{op, RawEvent, DebouncedEvent};

use self::timer::WatchTimer;

Expand All @@ -14,86 +14,29 @@ use std::time::Duration;

pub type OperationsBuffer = Arc<Mutex<HashMap<PathBuf, (Option<op::Op>, Option<PathBuf>, Option<u64>)>>>;

#[derive(Debug)]
/// Events emitted by `notify` in _debounced_ mode.
pub enum Event {
/// `NoticeWrite` is emitted imediatelly after the first write event for the path.
///
/// If you are reading from that file, you should probably close it imediatelly and discard all data you read from it.
NoticeWrite(PathBuf),
/// `NoticeRemove` is emitted imediatelly after a remove or rename event for the path.
///
/// The file will continue to exist until its last file handle is closed.
NoticeRemove(PathBuf),
/// `Create` is emitted when a file or directory has been created and no events were detected for the path within the specified time frame.
///
/// `Create` events have a higher priority than `Write` and `Chmod`.
/// These events will not be emitted if they are detected before the `Create` event has been emitted.
Create(PathBuf),
/// `Write` is emitted when a file has been written to and no events were detected for the path within the specified time frame.
///
/// `Write` events have a higher priority than `Chmod`.
/// `Chmod` will not be emitted if it's detected before the `Write` event has been emitted.
///
/// Upon receiving a `Create` event for a directory, it is necessary to scan the newly created directory for contents.
/// The directory can contain files or directories if those contents were created before the directory could be watched,
/// or if the directory was moved into the watched directory.
Write(PathBuf),
/// `Chmod` is emitted when attributes have been changed and no events were detected for the path within the specified time frame.
Chmod(PathBuf),
/// `Remove` is emitted when a file or directory has been removed and no events were detected for the path within the specified time frame.
Remove(PathBuf),
/// `Rename` is emitted when a file or directory has been moved within a watched directory and no events were detected for the new path within the specified time frame.
///
/// The first path contains the source, the second path the destination.
Rename(PathBuf, PathBuf),
/// `Rescan` is emitted imediatelly after a problem has been detected that makes it necessary to re-scan the watched directories.
Rescan,
/// `Error` is emitted imediatelly after a error has been detected.
///
/// This event may contain a path for which the error was detected.
Error(Error, Option<PathBuf>),
}

impl PartialEq for Event {
fn eq(&self, other: &Event) -> bool {
match (self, other) {
(&Event::NoticeWrite(ref a), &Event::NoticeWrite(ref b)) |
(&Event::NoticeRemove(ref a), &Event::NoticeRemove(ref b)) |
(&Event::Create(ref a), &Event::Create(ref b)) |
(&Event::Write(ref a), &Event::Write(ref b)) |
(&Event::Chmod(ref a), &Event::Chmod(ref b)) |
(&Event::Remove(ref a), &Event::Remove(ref b)) => a == b,
(&Event::Rename(ref a1, ref a2), &Event::Rename(ref b1, ref b2)) => (a1 == b1 && a2 == b2),
(&Event::Rescan, &Event::Rescan) => true,
_ => false,
}
}
}

pub enum EventTx {
Raw {
tx: mpsc::Sender<NotifyEvent>,
tx: mpsc::Sender<RawEvent>,
},
Debounced {
tx: mpsc::Sender<Event>,
tx: mpsc::Sender<DebouncedEvent>,
debounce: Debounce,
},
DebouncedTx {
tx: mpsc::Sender<Event>,
tx: mpsc::Sender<DebouncedEvent>,
},
}

impl EventTx {
pub fn send(&mut self, event: NotifyEvent) {
pub fn send(&mut self, event: RawEvent) {
match *self {
EventTx::Raw { ref tx } => {
let _ = tx.send(event);
}
EventTx::Debounced { ref tx, ref mut debounce } => {
match (event.path, event.op, event.cookie) {
(None, Ok(op::RESCAN), None) => {
let _ = tx.send(Event::Rescan);
let _ = tx.send(DebouncedEvent::Rescan);
}
(Some(path), Ok(op), cookie) => {
debounce.event(path, op, cookie);
Expand All @@ -102,14 +45,14 @@ impl EventTx {
// TODO panic!("path is None: {:?} ({:?})", _op, _cookie);
}
(path, Err(e), _) => {
let _ = tx.send(Event::Error(e, path));
let _ = tx.send(DebouncedEvent::Error(e, path));
}
}
}
EventTx::DebouncedTx { ref tx } => {
match (event.path, event.op, event.cookie) {
(None, Ok(op::RESCAN), None) => {
let _ = tx.send(Event::Rescan);
let _ = tx.send(DebouncedEvent::Rescan);
}
(Some(_path), Ok(_op), _cookie) => {
// TODO debounce.event(_path, _op, _cookie);
Expand All @@ -118,7 +61,7 @@ impl EventTx {
// TODO panic!("path is None: {:?} ({:?})", _op, _cookie);
}
(path, Err(e), _) => {
let _ = tx.send(Event::Error(e, path));
let _ = tx.send(DebouncedEvent::Error(e, path));
}
}
}
Expand All @@ -127,15 +70,15 @@ impl EventTx {
}

pub struct Debounce {
tx: mpsc::Sender<Event>,
tx: mpsc::Sender<DebouncedEvent>,
operations_buffer: OperationsBuffer,
rename_path: Option<PathBuf>,
rename_cookie: Option<u32>,
timer: WatchTimer,
}

impl Debounce {
pub fn new(delay: Duration, tx: mpsc::Sender<Event>) -> Debounce {
pub fn new(delay: Duration, tx: mpsc::Sender<DebouncedEvent>) -> Debounce {
let operations_buffer: OperationsBuffer = Arc::new(Mutex::new(HashMap::new()));

// spawns new thread
Expand Down Expand Up @@ -187,7 +130,7 @@ impl Debounce {
Some(op::WRITE) | // change to remove event
Some(op::CHMOD) => { // change to remove event
*operation = Some(op::REMOVE);
let _ = self.tx.send(Event::NoticeRemove(path.clone()));
let _ = self.tx.send(DebouncedEvent::NoticeRemove(path.clone()));
restart_timer(timer_id, path, &mut self.timer);
}
Some(op::RENAME) => {
Expand All @@ -212,7 +155,7 @@ impl Debounce {

pub fn event(&mut self, path: PathBuf, mut op: op::Op, cookie: Option<u32>) {
if op.contains(op::RESCAN) {
let _ = self.tx.send(Event::Rescan);
let _ = self.tx.send(DebouncedEvent::Rescan);
}

if self.rename_path.is_some() {
Expand Down Expand Up @@ -265,7 +208,7 @@ impl Debounce {
Some(op::RENAME) | // file has been renamed before, upgrade to write event
None => { // operations_buffer entry didn't exist
*operation = Some(op::WRITE);
let _ = self.tx.send(Event::NoticeWrite(path.clone()));
let _ = self.tx.send(DebouncedEvent::NoticeWrite(path.clone()));
restart_timer(timer_id, path.clone(), &mut self.timer);
}
// writing to a deleted file is impossible
Expand Down Expand Up @@ -339,13 +282,13 @@ impl Debounce {
}
Some(op::WRITE) | // keep write event
Some(op::CHMOD) => { // keep chmod event
let _ = self.tx.send(Event::NoticeRemove(path.clone()));
let _ = self.tx.send(DebouncedEvent::NoticeRemove(path.clone()));
restart_timer(timer_id, path.clone(), &mut self.timer);
}
None => {
// operations_buffer entry didn't exist
*operation = Some(op::RENAME);
let _ = self.tx.send(Event::NoticeRemove(path.clone()));
let _ = self.tx.send(DebouncedEvent::NoticeRemove(path.clone()));
restart_timer(timer_id, path.clone(), &mut self.timer);
}
// renaming a deleted file is impossible
Expand Down Expand Up @@ -373,7 +316,7 @@ impl Debounce {
Some(op::CHMOD) | // change to remove event
None => { // operations_buffer entry didn't exist
*operation = Some(op::REMOVE);
let _ = self.tx.send(Event::NoticeRemove(path.clone()));
let _ = self.tx.send(DebouncedEvent::NoticeRemove(path.clone()));
restart_timer(timer_id, path.clone(), &mut self.timer);
}
Some(op::RENAME) => {
Expand Down
24 changes: 12 additions & 12 deletions src/debounce/timer.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
use super::super::op;
use super::super::{op, DebouncedEvent};

use std::sync::mpsc;
use std::thread;
Expand All @@ -8,7 +8,7 @@ use std::collections::{BinaryHeap, HashSet};
use std::path::PathBuf;
use std::cmp::Ordering;

use debounce::{self, OperationsBuffer};
use debounce::OperationsBuffer;

enum Action {
Schedule(ScheduledEvent),
Expand Down Expand Up @@ -39,12 +39,12 @@ struct ScheduleWorker {
request_source: mpsc::Receiver<Action>,
schedule: BinaryHeap<ScheduledEvent>,
ignore: HashSet<u64>,
tx: mpsc::Sender<debounce::Event>,
tx: mpsc::Sender<DebouncedEvent>,
operations_buffer: OperationsBuffer,
}

impl ScheduleWorker {
fn new(trigger: Arc<Condvar>, request_source: mpsc::Receiver<Action>, tx: mpsc::Sender<debounce::Event>, operations_buffer: OperationsBuffer) -> ScheduleWorker {
fn new(trigger: Arc<Condvar>, request_source: mpsc::Receiver<Action>, tx: mpsc::Sender<DebouncedEvent>, operations_buffer: OperationsBuffer) -> ScheduleWorker {
ScheduleWorker{
trigger: trigger,
request_source: request_source,
Expand Down Expand Up @@ -86,18 +86,18 @@ impl ScheduleWorker {
if let Some((op, from_path, _)) = op_buf.remove(&path) {
let is_partial_rename = from_path.is_none();
if let Some(from_path) = from_path {
self.tx.send(debounce::Event::Rename(from_path, path.clone())).unwrap();
self.tx.send(DebouncedEvent::Rename(from_path, path.clone())).unwrap();
}
let message = match op {
Some(op::CREATE) => Some(debounce::Event::Create(path)),
Some(op::WRITE) => Some(debounce::Event::Write(path)),
Some(op::CHMOD) => Some(debounce::Event::Chmod(path)),
Some(op::REMOVE) => Some(debounce::Event::Remove(path)),
Some(op::CREATE) => Some(DebouncedEvent::Create(path)),
Some(op::WRITE) => Some(DebouncedEvent::Write(path)),
Some(op::CHMOD) => Some(DebouncedEvent::Chmod(path)),
Some(op::REMOVE) => Some(DebouncedEvent::Remove(path)),
Some(op::RENAME) if is_partial_rename => {
if path.exists() {
Some(debounce::Event::Create(path))
Some(DebouncedEvent::Create(path))
} else {
Some(debounce::Event::Remove(path))
Some(DebouncedEvent::Remove(path))
}
},
_ => None
Expand Down Expand Up @@ -155,7 +155,7 @@ pub struct WatchTimer {
}

impl WatchTimer {
pub fn new(tx: mpsc::Sender<debounce::Event>, operations_buffer: OperationsBuffer, delay: Duration) -> WatchTimer {
pub fn new(tx: mpsc::Sender<DebouncedEvent>, operations_buffer: OperationsBuffer, delay: Duration) -> WatchTimer {
let (schedule_tx, schedule_rx) = mpsc::channel();
let trigger = Arc::new(Condvar::new());

Expand Down
Loading

0 comments on commit e1e1203

Please sign in to comment.