//! ## File system watcher //! //! A watcher for file system paths, which reports changes on local fs mod change; // -- export use std::collections::HashMap; use std::path::{Path, PathBuf}; use std::sync::mpsc::{Receiver, RecvTimeoutError, channel}; use std::time::Duration; pub use change::FsChange; use notify::{ Config, Error as WatcherError, Event, EventKind, RecommendedWatcher, RecursiveMode, Watcher, }; use thiserror::Error; use crate::utils::path as path_utils; type FsWatcherResult = Result; /// Describes an error returned by the `FsWatcher` #[derive(Debug, Error)] pub enum FsWatcherError { #[error("unable to unwatch this path, since is not currently watched")] PathNotWatched, #[error("unable to watch path, since it's already watched")] PathAlreadyWatched, #[error("unknown event: {0}")] UnknownEvent(&'static str), #[error("worker error: {0}")] WorkerError(WatcherError), } impl From for FsWatcherError { fn from(err: WatcherError) -> Self { Self::WorkerError(err) } } /// Describes an event that can be received from the `FsWatcher` #[derive(Debug, Clone, PartialEq, Eq)] enum FsWatcherEvent { Rename { source: PathBuf, dest: PathBuf }, Remove(PathBuf), Create(PathBuf), Modify(PathBuf), Other, } impl TryFrom for FsWatcherEvent { type Error = &'static str; fn try_from(ev: Event) -> Result { match ev.kind { EventKind::Any | EventKind::Access(_) | EventKind::Other => Ok(Self::Other), EventKind::Create(_) => { if ev.paths.len() == 2 { Ok(Self::Rename { source: ev.paths[0].clone(), dest: ev.paths[1].clone(), }) } else if let Some(p) = ev.paths.first() { Ok(Self::Create(p.clone())) } else { Err("No path found") } } EventKind::Modify(_) => { if ev.paths.len() == 2 { Ok(Self::Rename { source: ev.paths[0].clone(), dest: ev.paths[1].clone(), }) } else if let Some(p) = ev.paths.first() { Ok(Self::Modify(p.clone())) } else { Err("No path found") } } EventKind::Remove(_) => { if let Some(p) = ev.paths.first() { Ok(Self::Remove(p.clone())) } else { Err("No path found") } } } } } /// File system watcher pub struct FsWatcher { paths: HashMap, receiver: Receiver>, watcher: RecommendedWatcher, } impl FsWatcher { /// Initialize a new `FsWatcher` pub fn init(delay: Duration) -> FsWatcherResult { let (tx, receiver) = channel(); Ok(Self { paths: HashMap::default(), receiver, watcher: RecommendedWatcher::new(tx, Config::default().with_poll_interval(delay))?, }) } /// Poll searching for the first available disk change pub fn poll(&self) -> FsWatcherResult> { let res = match self.receiver.recv_timeout(Duration::from_millis(1)) { Ok(res) => res, Err(RecvTimeoutError::Timeout) => return Ok(None), Err(RecvTimeoutError::Disconnected) => panic!("File watcher died"), }; // convert event to FsChange let event = res .map(FsWatcherEvent::try_from) .map_err(FsWatcherError::from)? .map_err(FsWatcherError::UnknownEvent)?; match event { FsWatcherEvent::Rename { source, dest } => Ok(self.build_fs_move(source, dest)), FsWatcherEvent::Remove(p) => Ok(self.build_fs_remove(p)), FsWatcherEvent::Modify(p) | FsWatcherEvent::Create(p) => Ok(self.build_fs_update(p)), FsWatcherEvent::Other => { debug!("unknown event"); Ok(None) } } } /// Watch `local` path on localhost pub fn watch(&mut self, local: &Path, remote: &Path) -> FsWatcherResult<()> { // Start watcher if unwatched if !self.watched(local) { self.watcher.watch(local, RecursiveMode::Recursive)?; // Insert new path to paths self.paths.insert(local.to_path_buf(), remote.to_path_buf()); Ok(()) } else { Err(FsWatcherError::PathAlreadyWatched) } } /// Returns whether `path` is currently watched. /// This method looks also in path ancestors. /// /// Example: /// if `/home` is watched, then if we call `watched("/home/foo/file.txt")` will return `true` pub fn watched(&self, path: &Path) -> bool { self.find_watched_path(path).is_some() } /// Returns the list of watched paths pub fn watched_paths(&self) -> Vec<&Path> { Vec::from_iter(self.paths.keys().map(|x| x.as_path())) } /// Unwatch provided path. /// When unwatching the path, it searches for the ancestor watched path if any. /// Returns the unwatched resolved path pub fn unwatch(&mut self, path: &Path) -> FsWatcherResult { let watched_path = self.find_watched_path(path).map(|x| x.0.to_path_buf()); if let Some(watched_path) = watched_path { self.watcher.unwatch(watched_path.as_path())?; self.paths.remove(watched_path.as_path()); Ok(watched_path) } else { Err(FsWatcherError::PathNotWatched) } } /// Given a certain path, returns the path data associated to the path which /// is ancestor of that path in the current watched path fn find_watched_path(&self, p: &Path) -> Option<(&Path, &Path)> { self.paths .iter() .find(|(k, _)| path_utils::is_child_of(p, k)) .map(|(k, v)| (k.as_path(), v.as_path())) } /// Build `FsChange` from path to local `changed_file` fn build_fs_move(&self, source: PathBuf, destination: PathBuf) -> Option { if let Some((watched_local, watched_remote)) = self.find_watched_path(&source) { Some(FsChange::mov( source, destination, watched_local, watched_remote, )) } else { None } } /// Build `FsChange` from path to local `changed_file` fn build_fs_remove(&self, removed_path: PathBuf) -> Option { if let Some((watched_local, watched_remote)) = self.find_watched_path(&removed_path) { Some(FsChange::remove( removed_path, watched_local, watched_remote, )) } else { None } } /// Build `FsChange` from path to local `changed_file` fn build_fs_update(&self, changed_file: PathBuf) -> Option { if let Some((watched_local, watched_remote)) = self.find_watched_path(&changed_file) { Some(FsChange::update( changed_file, watched_local, watched_remote, )) } else { None } } } #[cfg(test)] mod test { use pretty_assertions::assert_eq; use tempfile::TempDir; use super::*; #[cfg(target_os = "macos")] use crate::utils::test_helpers; #[test] fn should_init_fswatcher() { let watcher = FsWatcher::init(Duration::from_secs(5)).unwrap(); assert!(watcher.paths.is_empty()); } #[test] fn should_watch_path() { let mut watcher = FsWatcher::init(Duration::from_secs(5)).unwrap(); let tempdir = TempDir::new().unwrap(); assert!( watcher .watch(tempdir.path(), Path::new("/tmp/test")) .is_ok() ); // check if in paths assert_eq!( watcher.paths.get(tempdir.path()).unwrap(), Path::new("/tmp/test") ); // close tempdir assert!(tempdir.close().is_ok()); } #[test] fn should_not_watch_path_if_subdir_of_watched_path() { let mut watcher = FsWatcher::init(Duration::from_secs(5)).unwrap(); let tempdir = TempDir::new().unwrap(); assert!( watcher .watch(tempdir.path(), Path::new("/tmp/test")) .is_ok() ); // watch subdir let mut subdir = tempdir.path().to_path_buf(); subdir.push("abc/def"); // should return already watched assert!( watcher .watch(subdir.as_path(), Path::new("/tmp/test/abc/def")) .is_err() ); // close tempdir assert!(tempdir.close().is_ok()); } #[test] fn should_unwatch_path() { let mut watcher = FsWatcher::init(Duration::from_secs(5)).unwrap(); let tempdir = TempDir::new().unwrap(); assert!( watcher .watch(tempdir.path(), Path::new("/tmp/test")) .is_ok() ); // unwatch assert!(watcher.unwatch(tempdir.path()).is_ok()); assert!(watcher.paths.get(tempdir.path()).is_none()); // close tempdir assert!(tempdir.close().is_ok()); } #[test] fn should_unwatch_path_when_subdir() { let mut watcher = FsWatcher::init(Duration::from_secs(5)).unwrap(); let tempdir = TempDir::new().unwrap(); assert!( watcher .watch(tempdir.path(), Path::new("/tmp/test")) .is_ok() ); // unwatch let mut subdir = tempdir.path().to_path_buf(); subdir.push("abc/def"); assert_eq!( watcher.unwatch(subdir.as_path()).unwrap().as_path(), Path::new(tempdir.path()) ); assert!(watcher.paths.get(tempdir.path()).is_none()); // close tempdir assert!(tempdir.close().is_ok()); } #[test] fn should_return_err_when_unwatching_unwatched_path() { let mut watcher = FsWatcher::init(Duration::from_secs(5)).unwrap(); assert!(watcher.unwatch(Path::new("/tmp")).is_err()); } #[test] fn should_tell_whether_path_is_watched() { let mut watcher = FsWatcher::init(Duration::from_secs(5)).unwrap(); let tempdir = TempDir::new().unwrap(); assert!( watcher .watch(tempdir.path(), Path::new("/tmp/test")) .is_ok() ); assert_eq!(watcher.watched(tempdir.path()), true); let mut subdir = tempdir.path().to_path_buf(); subdir.push("abc/def"); assert_eq!(watcher.watched(subdir.as_path()), true); assert_eq!(watcher.watched(Path::new("/tmp")), false); // close tempdir assert!(tempdir.close().is_ok()); } #[test] #[cfg(target_os = "macos")] fn should_poll_file_update() { let mut watcher = FsWatcher::init(Duration::from_millis(100)).unwrap(); let tempdir = TempDir::new().unwrap(); let tempdir_path = PathBuf::from(format!("/private{}", tempdir.path().display())); assert!( watcher .watch(tempdir_path.as_path(), Path::new("/tmp/test")) .is_ok() ); // create file let file_path = test_helpers::make_file_at(tempdir_path.as_path(), "test.txt").unwrap(); // wait std::thread::sleep(Duration::from_millis(500)); // wait till update loop { let fs_change = watcher.poll().unwrap(); if let Some(FsChange::Update(_)) = fs_change { break; } std::thread::sleep(Duration::from_millis(500)); } assert!(std::fs::remove_file(file_path.as_path()).is_ok()); // close tempdir assert!(tempdir.close().is_ok()); } #[test] #[cfg(target_os = "macos")] fn should_poll_file_removed() { let mut watcher = FsWatcher::init(Duration::from_millis(100)).unwrap(); let tempdir = TempDir::new().unwrap(); let tempdir_path = PathBuf::from(format!("/private{}", tempdir.path().display())); assert!( watcher .watch(tempdir_path.as_path(), Path::new("/tmp/test")) .is_ok() ); // create file let file_path = test_helpers::make_file_at(tempdir_path.as_path(), "test.txt").unwrap(); std::thread::sleep(Duration::from_millis(500)); // wait assert!(std::fs::remove_file(file_path.as_path()).is_ok()); // poll till remove loop { let fs_change = watcher.poll().unwrap(); if let Some(FsChange::Remove(remove)) = fs_change { assert_eq!(remove.path(), Path::new("/tmp/test/test.txt")); break; } std::thread::sleep(Duration::from_millis(500)); } // close tempdir assert!(tempdir.close().is_ok()); } /* #[test] #[cfg(posix)] fn should_poll_file_moved() { let mut watcher = FsWatcher::init(Duration::from_millis(100)).unwrap(); let tempdir = TempDir::new().unwrap(); let tempdir_path = PathBuf::from(format!("/private{}", tempdir.path().display())); assert!(watcher .watch(tempdir_path.as_path(), Path::new("/tmp/test")) .is_ok()); // create file let file_path = test_helpers::make_file_at(tempdir_path.as_path(), "test.txt").unwrap(); // wait std::thread::sleep(Duration::from_millis(500)); // move file let mut new_file_path = tempdir.path().to_path_buf(); new_file_path.push("new.txt"); assert!(std::fs::rename(file_path.as_path(), new_file_path.as_path()).is_ok()); std::thread::sleep(Duration::from_millis(500)); // wait till rename loop { let fs_change = watcher.poll().unwrap(); if let Some(FsChange::Move(mov)) = fs_change { assert_eq!(mov.source(), Path::new("/tmp/test/test.txt")); assert_eq!(mov.destination(), Path::new("/tmp/test/new.txt")); break; } std::thread::sleep(Duration::from_millis(500)); } // remove file assert!(std::fs::remove_file(new_file_path.as_path()).is_ok()); // close tempdir assert!(tempdir.close().is_ok()); } */ #[test] #[cfg(target_os = "macos")] fn should_poll_nothing() { let mut watcher = FsWatcher::init(Duration::from_secs(5)).unwrap(); let tempdir = TempDir::new().unwrap(); assert!( watcher .watch(tempdir.path(), Path::new("/tmp/test")) .is_ok() ); assert!(watcher.poll().ok().unwrap().is_none()); // close tempdir assert!(tempdir.close().is_ok()); } #[test] #[cfg(target_os = "macos")] fn should_get_watched_paths() { let mut watcher = FsWatcher::init(Duration::from_secs(5)).unwrap(); assert!(watcher.watch(Path::new("/tmp"), Path::new("/tmp")).is_ok()); assert!( watcher .watch(Path::new("/home"), Path::new("/home")) .is_ok() ); let mut watched_paths = watcher.watched_paths(); watched_paths.sort(); assert_eq!(watched_paths, vec![Path::new("/home"), Path::new("/tmp")]); } }