diff --git a/benchmark-new/benchmark_repository/Cargo.lock b/benchmark-new/benchmark_repository/Cargo.lock index 1d53e26aa..53a5e6a9b 100644 --- a/benchmark-new/benchmark_repository/Cargo.lock +++ b/benchmark-new/benchmark_repository/Cargo.lock @@ -35,6 +35,7 @@ dependencies = [ "notify 4.0.6 (registry+https://github.com/rust-lang/crates.io-index)", "pretty_env_logger 0.2.4 (registry+https://github.com/rust-lang/crates.io-index)", "tempfile 3.0.4 (registry+https://github.com/rust-lang/crates.io-index)", + "walkdir 2.2.5 (registry+https://github.com/rust-lang/crates.io-index)", "yaml-rust 0.4.2 (registry+https://github.com/rust-lang/crates.io-index)", ] diff --git a/benchmark-new/benchmark_repository/Cargo.toml b/benchmark-new/benchmark_repository/Cargo.toml index 24fe5642e..327852761 100644 --- a/benchmark-new/benchmark_repository/Cargo.toml +++ b/benchmark-new/benchmark_repository/Cargo.toml @@ -11,6 +11,7 @@ indicatif = "0.9" yaml-rust = "0.4" notify = "4" tempfile = "3" +walkdir = "2" [dependencies.cpython] git = "https://github.com/dgrunwald/rust-cpython/" diff --git a/benchmark-new/benchmark_repository/examples/test.rs b/benchmark-new/benchmark_repository/examples/test.rs index 5c0ac7d09..6456b7962 100644 --- a/benchmark-new/benchmark_repository/examples/test.rs +++ b/benchmark-new/benchmark_repository/examples/test.rs @@ -59,8 +59,5 @@ fn main() } } - let duration = time::Duration::from_secs(20); - thread::sleep(duration); - benchmark_repository.wait_for_autocommit_thread(); } diff --git a/benchmark-new/benchmark_repository/src/lib.rs b/benchmark-new/benchmark_repository/src/lib.rs index 072c4adc5..6aa20c609 100644 --- a/benchmark-new/benchmark_repository/src/lib.rs +++ b/benchmark-new/benchmark_repository/src/lib.rs @@ -5,36 +5,45 @@ extern crate log; extern crate indicatif; extern crate notify; extern crate tempfile; +extern crate walkdir; use git2::{Cred, Error, FetchOptions, Index, IndexEntry, IndexTime, Progress, PushOptions, RemoteCallbacks, ResetType, Repository, Signature, TreeBuilder}; use git2::build::{CheckoutBuilder, RepoBuilder}; -use std::fs::{File, create_dir_all, read}; +use std::fs::{File, create_dir_all, read, remove_file}; use std::io::Cursor; use std::path::{Path, PathBuf}; use std::string::String; use std::str; +use std::sync::{Arc, Mutex}; use std::sync::mpsc::{channel, Sender, Receiver}; use std::thread::{spawn, JoinHandle}; use indicatif::{ProgressBar, ProgressStyle}; -use notify::{raw_watcher, RecommendedWatcher, RawEvent, Op}; +use notify::{raw_watcher, RecommendedWatcher, RawEvent, Op, RecursiveMode, Watcher}; use tempfile::TempDir; +use walkdir::WalkDir; -pub struct BenchmarkRepository +pub struct BenchmarkRepositoryInner { repository: Repository, ssh_user: String, user_name: String, user_email: String, +} + +pub struct BenchmarkRepository +{ + inner: Arc>, autocommit_channel: (Sender, Receiver), autocommit_directory: Option, autocommit_thread: Option>, autocommit_watcher: Option, + autocommit_locks: Arc>, } -pub struct TargetPath<'a> +pub struct TargetPath { - pub source: &'a Path, - pub destination: &'a Path, + pub source: PathBuf, + pub destination: PathBuf, } impl BenchmarkRepository @@ -74,11 +83,13 @@ impl BenchmarkRepository fn reset_origin(&self, remote_url: &str) -> &Self { - let remote = match self.repository.find_remote("origin") + let inner = self.inner.lock().unwrap(); + + let remote = match inner.repository.find_remote("origin") { Ok(remote) => remote, Err(_) => - match self.repository.remote("origin", remote_url) + match inner.repository.remote("origin", remote_url) { Ok(remote) => remote, Err(error) => panic!("Could not reset remote “origin”: {}", error), @@ -92,6 +103,8 @@ impl BenchmarkRepository fn fetch_branch(&self, branch_name: &str) -> &Self { + let inner = self.inner.lock().unwrap(); + let mut progress_bar = ProgressBar::new(0); progress_bar.set_style(BenchmarkRepository::progress_bar_style()); @@ -100,10 +113,10 @@ impl BenchmarkRepository remote_callbacks.credentials( |_, _, _| { - match Cred::ssh_key_from_agent(&self.ssh_user) + match Cred::ssh_key_from_agent(&inner.ssh_user) { Ok(credentials) => Ok(credentials), - Err(error) => panic!("could not retrieve key pair for SSH authentication as user “{}”: {}", self.ssh_user, error), + Err(error) => panic!("could not retrieve key pair for SSH authentication as user “{}”: {}", inner.ssh_user, error), } }); @@ -113,7 +126,7 @@ impl BenchmarkRepository let mut fetch_options = FetchOptions::new(); fetch_options.remote_callbacks(remote_callbacks); - let mut origin = self.repository.find_remote("origin").expect("could not find remote “origin”"); + let mut origin = inner.repository.find_remote("origin").expect("could not find remote “origin”"); info!("Updating branch “{}”", branch_name); @@ -156,17 +169,24 @@ impl BenchmarkRepository Err(_) => BenchmarkRepository::init(base_path), }; - let benchmark_repository = - BenchmarkRepository + let benchmark_repository_inner = + BenchmarkRepositoryInner { repository: repository, ssh_user: ssh_user.to_string(), user_name: user_name.to_string(), user_email: user_email.to_string(), + }; + + let benchmark_repository = + BenchmarkRepository + { + inner: Arc::new(Mutex::new(benchmark_repository_inner)), autocommit_channel: channel(), autocommit_directory: None, autocommit_thread: None, autocommit_watcher: None, + autocommit_locks: Arc::new(Mutex::new(0)), }; benchmark_repository @@ -178,10 +198,10 @@ impl BenchmarkRepository benchmark_repository } - pub fn read_file_as_index_entry(&self, file_path: &Path, result_file_path: &Path) -> IndexEntry + pub fn read_file_as_index_entry(inner: &BenchmarkRepositoryInner, file_path: &Path, result_file_path: &Path) -> IndexEntry { // create a new blob with the file contents - let object_id = match self.repository.blob_path(file_path) + let object_id = match inner.repository.blob_path(file_path) { Ok(object_id) => object_id, Err(error) => panic!("Could not write blob for “{}”: {}", file_path.display(), error), @@ -206,10 +226,33 @@ impl BenchmarkRepository } } - pub fn commit_files(&self, file_paths: &[TargetPath], branch_name: &str) + pub fn commit_directory(inner: &BenchmarkRepositoryInner, directory_path: &TargetPath, branch_name: &str) + { + let mut file_paths = vec![]; + + for entry in WalkDir::new(&directory_path.source) + { + let entry = entry.unwrap(); + + if entry.path().file_name().unwrap() == ".lock" || entry.file_type().is_dir() + { + continue; + } + + let relative_path = entry.path().strip_prefix(&directory_path.source).unwrap(); + + trace!("Adding “{}” (from “{}”) to autocommit list", relative_path.display(), entry.path().display()); + + file_paths.push(TargetPath{source: entry.path().to_owned(), destination: directory_path.destination.join(&relative_path)}); + } + + BenchmarkRepository::commit_files(&inner, &file_paths, branch_name); + } + + pub fn commit_files(inner: &BenchmarkRepositoryInner, file_paths: &[TargetPath], branch_name: &str) { let tip_reference_name = format!("refs/remotes/origin/{}", branch_name); - let tip_reference = match self.repository.find_reference(&tip_reference_name) + let tip_reference = match inner.repository.find_reference(&tip_reference_name) { Ok(value) => value, Err(error) => panic!("Could not find reference “{}”: {}", tip_reference_name, error), @@ -235,7 +278,7 @@ impl BenchmarkRepository for target_path in file_paths { - let index_entry = self.read_file_as_index_entry(target_path.source, target_path.destination); + let index_entry = BenchmarkRepository::read_file_as_index_entry(inner, &target_path.source, &target_path.destination); if let Err(error) = index.add(&index_entry) { @@ -243,13 +286,13 @@ impl BenchmarkRepository } } - let tree_object_id = match index.write_tree_to(&self.repository) + let tree_object_id = match index.write_tree_to(&inner.repository) { Ok(value) => value, Err(error) => panic!("Could not write index to tree: {}", error), }; - let tree = match self.repository.find_tree(tree_object_id) + let tree = match inner.repository.find_tree(tree_object_id) { Ok(value) => value, Err(error) => panic!("Could obtain tree: {}", error), @@ -257,7 +300,7 @@ impl BenchmarkRepository info!("Created tree object “{}”", tree_object_id); - let signature = Signature::now(&self.user_name, &self.user_email).expect("Could not create signature"); + let signature = Signature::now(&inner.user_name, &inner.user_email).expect("Could not create signature"); let message = format!("Add files"); let parent = match tip_reference.peel_to_commit() @@ -266,7 +309,7 @@ impl BenchmarkRepository Err(error) => panic!("Could not peel reference: {}", error), }; - let commit_id = match self.repository.commit(Some(&tip_reference_name), &signature, &signature, &message, &tree, &[&parent]) + let commit_id = match inner.repository.commit(Some(&tip_reference_name), &signature, &signature, &message, &tree, &[&parent]) { Ok(value) => value, Err(error) => panic!("Could not write commit: {}", error), @@ -280,10 +323,10 @@ impl BenchmarkRepository remote_callbacks.credentials( |_, _, _| { - match Cred::ssh_key_from_agent(&self.ssh_user) + match Cred::ssh_key_from_agent(&inner.ssh_user) { Ok(value) => Ok(value), - Err(error) => panic!("could not retrieve key pair for SSH authentication as user “{}”: {}", self.ssh_user, error), + Err(error) => panic!("could not retrieve key pair for SSH authentication as user “{}”: {}", inner.ssh_user, error), } }); @@ -293,14 +336,16 @@ impl BenchmarkRepository let mut push_options = PushOptions::new(); push_options.remote_callbacks(remote_callbacks); - let mut remote = self.repository.find_remote("origin").expect(""); + let mut remote = inner.repository.find_remote("origin").expect(""); remote.push(&[&push_refspec], Some(&mut push_options)).expect("couldn’t push"); } pub fn file_exists(&self, file_path: &Path, branch_name: &str) -> bool { + let inner = self.inner.lock().unwrap(); + let tip_reference_name = format!("refs/remotes/origin/{}", branch_name); - let tip_reference = match self.repository.find_reference(&tip_reference_name) + let tip_reference = match inner.repository.find_reference(&tip_reference_name) { Ok(value) => value, Err(error) => panic!("Could not find reference “{}”: {}", tip_reference_name, error), @@ -318,7 +363,7 @@ impl BenchmarkRepository Err(error) => return false, }; - let blob = match self.repository.find_blob(object_id) + let blob = match inner.repository.find_blob(object_id) { Ok(blob) => blob, Err(error) => return false, @@ -329,8 +374,10 @@ impl BenchmarkRepository pub fn read_file(&self, file_path: &Path, branch_name: &str) -> Option { + let inner = self.inner.lock().unwrap(); + let tip_reference_name = format!("refs/remotes/origin/{}", branch_name); - let tip_reference = match self.repository.find_reference(&tip_reference_name) + let tip_reference = match inner.repository.find_reference(&tip_reference_name) { Ok(value) => value, Err(error) => panic!("Could not find reference “{}”: {}", tip_reference_name, error), @@ -348,7 +395,7 @@ impl BenchmarkRepository Err(error) => return None, }; - let blob = match self.repository.find_blob(object_id) + let blob = match inner.repository.find_blob(object_id) { Ok(blob) => blob, Err(error) => return None, @@ -376,6 +423,8 @@ impl BenchmarkRepository Err(error) => panic!("Could not create autocommit directory: {}", error), }; + trace!("Created temporary autocommit directory in {}", tmp_dir.path().display()); + let lock_file_path = tmp_dir.path().join(".lock"); if let Err(error) = File::create(&lock_file_path) @@ -385,22 +434,27 @@ impl BenchmarkRepository let (sender, receiver) = channel(); - let watcher = match raw_watcher(sender) + let mut watcher = match raw_watcher(sender) { Ok(value) => value, Err(error) => panic!("Could not create filesystem watcher: {}", error), }; + watcher.watch(&tmp_dir, RecursiveMode::Recursive); + self.autocommit_watcher = Some(watcher); self.autocommit_directory = Some(tmp_dir); - trace!("{:#?}", receiver); + let autocommit_base_path = self.autocommit_directory.as_ref().unwrap().path().to_owned(); + let inner = self.inner.clone(); + let autocommit_locks = self.autocommit_locks.clone(); let thread = spawn( move || { trace!("Autocommit thread running"); - trace!("{:#?}", receiver); + + let mut active = true; loop { @@ -408,12 +462,48 @@ impl BenchmarkRepository { Ok(RawEvent{path: Some(path), op: Ok(Op::REMOVE), cookie: _}) => { - trace!("Received “remove” event for path “{}”", path.display()); + if path == lock_file_path + { + trace!("Waiting for remaining autocommit locks to be released"); + active = false; + } + else + { + trace!("Received “remove” event for path “{}”", path.display()); + + let stripped_path = match path.strip_prefix(&autocommit_base_path) + { + Ok(value) => value, + Err(error) => panic!("Cannot handle “remove” event for path “{}”", path.display()), + }; + + let mut components = stripped_path.components(); + let branch_name = components.next().unwrap(); + let result_path = stripped_path.strip_prefix(&branch_name).unwrap().parent().unwrap(); + + info!("Autocommiting “{}”", result_path.display()); + + let inner = inner.lock().unwrap(); + + BenchmarkRepository::commit_directory(&inner, &TargetPath{source: path.parent().unwrap().to_owned(), destination: result_path.to_owned()}, &branch_name.as_os_str().to_str().unwrap()); + + let mut autocommit_locks = autocommit_locks.lock().unwrap(); + *autocommit_locks -= 1; + } }, Err(error) => panic!("Error handling notify event: {}", error), _ => (), } + + let mut autocommit_locks = autocommit_locks.lock().unwrap(); + + if *autocommit_locks == 0 + { + break; + } } + + trace!("Autocommit thread finished"); }); self.autocommit_thread = Some(thread); @@ -447,6 +537,9 @@ impl BenchmarkRepository } } + let mut autocommit_locks = self.autocommit_locks.lock().unwrap(); + *autocommit_locks += 1; + Ok(result_directory_path) } @@ -457,6 +550,15 @@ impl BenchmarkRepository panic!("No autocommit thread started"); } + let lock_file_path = self.autocommit_directory.as_ref().unwrap().path().join(".lock"); + + if let Err(error) = remove_file(&lock_file_path) + { + panic!("Could not remove lock file “{}”: {}", lock_file_path.display(), error); + } + + info!("Removed lock file"); + let autocommit_thread = self.autocommit_thread; if let Err(RecvError) = autocommit_thread.unwrap().join()