improved multithreading behaviour, & a new flag!

`-j`/`--jobs` can be used to manually set the number of threads to use for scanning files. additionally, fif won't bother with multi-threaded scanning when there are less than 32 files to scan.
This commit is contained in:
Lynne Megido 2021-09-25 18:55:50 +10:00
parent 741048839c
commit 5e17e4efda
Signed by: lynnesbian
GPG Key ID: F0A184B5213D9F90
10 changed files with 80 additions and 17 deletions

View File

@ -6,6 +6,7 @@
<option name="requiredFeatures" value="true" /> <option name="requiredFeatures" value="true" />
<option name="allFeatures" value="false" /> <option name="allFeatures" value="false" />
<option name="emulateTerminal" value="false" /> <option name="emulateTerminal" value="false" />
<option name="withSudo" value="false" />
<option name="backtrace" value="SHORT" /> <option name="backtrace" value="SHORT" />
<envs> <envs>
<env name="RUST_LOG" value="debug" /> <env name="RUST_LOG" value="debug" />

View File

@ -2,5 +2,6 @@
<project version="4"> <project version="4">
<component name="VcsDirectoryMappings"> <component name="VcsDirectoryMappings">
<mapping directory="$PROJECT_DIR$" vcs="Git" /> <mapping directory="$PROJECT_DIR$" vcs="Git" />
<mapping directory="$PROJECT_DIR$/src/walkdir" vcs="Git" />
</component> </component>
</project> </project>

View File

@ -8,6 +8,10 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
### Added ### Added
- AIFF (Audio Interchange File Format, a PCM audio format like WAV) detection to [`infer`] backend - AIFF (Audio Interchange File Format, a PCM audio format like WAV) detection to [`infer`] backend
- `--version` output now includes the (short) hash of the git commit fif was built from - `--version` output now includes the (short) hash of the git commit fif was built from
- `-j`/`--jobs` flag for specifying the number of threads fif should use for scanning files
### Changed
- fif will no longer use multithreading when scanning less than 32 files - the overhead of spawning threads isn't really
worth it
### Other ### Other
- Refactoring - split fif into `main.rs` and `lib.rs`, moved file-related functionality (directory scanning, etc.) into - Refactoring - split fif into `main.rs` and `lib.rs`, moved file-related functionality (directory scanning, etc.) into
files module, removed string module, etc. files module, removed string module, etc.

3
Cargo.lock generated
View File

@ -1,5 +1,7 @@
# This file is automatically @generated by Cargo. # This file is automatically @generated by Cargo.
# It is not intended for manual editing. # It is not intended for manual editing.
version = 3
[[package]] [[package]]
name = "arrayvec" name = "arrayvec"
version = "0.5.2" version = "0.5.2"
@ -186,6 +188,7 @@ dependencies = [
"log", "log",
"mime", "mime",
"new_mime_guess", "new_mime_guess",
"num_cpus",
"once_cell", "once_cell",
"rand", "rand",
"rayon", "rayon",

View File

@ -17,7 +17,7 @@ maintenance = { status = "experimental" }
[features] [features]
default = ["multi-threaded", "json"] default = ["multi-threaded", "json"]
multi-threaded = ["rayon"] multi-threaded = ["rayon", "num_cpus"]
infer-backend = ["infer"] infer-backend = ["infer"]
xdg-mime-backend = ["xdg-mime"] xdg-mime-backend = ["xdg-mime"]
json = ["serde", "serde_json"] json = ["serde", "serde_json"]
@ -36,6 +36,7 @@ itertools = "0.10.0"
serde = { version = "1.0", features = ["derive"], optional = true } serde = { version = "1.0", features = ["derive"], optional = true }
serde_json = { version = "1.0", optional = true } serde_json = { version = "1.0", optional = true }
bitflags = "~1.2.1" # 1.3+ requires Rust >= 1.46 bitflags = "~1.2.1" # 1.3+ requires Rust >= 1.46
num_cpus = { version = "1.13.0", optional = true }
[target.'cfg(not(unix))'.dependencies] [target.'cfg(not(unix))'.dependencies]
xdg-mime = { version = "0.3.3", optional = true } xdg-mime = { version = "0.3.3", optional = true }

View File

@ -136,25 +136,42 @@ pub fn scan_file(entry: &DirEntry, canonical_paths: bool) -> Result<Findings, Sc
} }
/// Takes a slice of [`DirEntry`]s and calls [`scan_file`] on each one, returning the results in a vector. /// Takes a slice of [`DirEntry`]s and calls [`scan_file`] on each one, returning the results in a vector.
pub fn scan_from_walkdir(entries: &[DirEntry], canonical_paths: bool) -> Vec<Result<Findings, ScanError>> { pub fn scan_from_walkdir(
entries: &[DirEntry],
canonical_paths: bool,
use_threads: bool,
) -> Vec<Result<Findings, ScanError>> {
cfg_if! { cfg_if! {
if #[cfg(feature = "multi-threaded")] { if #[cfg(feature = "multi-threaded")] {
use rayon::prelude::*; use rayon::prelude::*;
const CHUNKS: usize = 32;
// split the entries into chunks of 32, and iterate over each chunk of entries in a separate thread if use_threads && entries.len() > CHUNKS {
entries // split the entries into chunks of 32, and iterate over each chunk of entries in a separate thread
.par_chunks(32) return entries
.flat_map(|chunk| { .par_chunks(CHUNKS)
chunk .flat_map(|chunk| {
.iter() // iter over the chunk, which is a slice of DirEntry structs chunk
.map(|entry| scan_file(entry, canonical_paths)) .iter() // iter over the chunk, which is a slice of DirEntry structs
.collect::<Vec<_>>() .map(|entry| scan_file(entry, canonical_paths))
}) .collect::<Vec<_>>() // TODO: is there a way to avoid having to collect here?
.collect() })
.collect()
}
} else { } else {
entries.iter().map(|entry: &DirEntry| scan_file(entry, canonical_paths)).collect() // should always be false when multi-threading is disabled at compile time
assert!(!use_threads)
} }
} }
// if we end up here, either
// - there were less than CHUNKS files to scan, or
// - the user specified that only one thread should be used, by specifying `-j 1`
// - fif was compiled without the `multi-threading` feature
entries
.iter()
.map(|entry: &DirEntry| scan_file(entry, canonical_paths))
.collect()
} }
/// Scans a given directory with [`WalkDir`], filters with [`wanted_file`], checks for errors, and returns a vector of /// Scans a given directory with [`WalkDir`], filters with [`wanted_file`], checks for errors, and returns a vector of

View File

@ -7,7 +7,7 @@ use std::os::unix::ffi::OsStrExt;
use std::path::Path; use std::path::Path;
use cfg_if::cfg_if; use cfg_if::cfg_if;
use itertools::{Either, Itertools}; use itertools::Itertools;
use snailquote::escape; use snailquote::escape;
use crate::findings::ScanError; use crate::findings::ScanError;
@ -338,6 +338,8 @@ pub struct Json;
#[cfg(feature = "json")] #[cfg(feature = "json")]
impl Format for Json { impl Format for Json {
fn write_all<W: Write>(&self, f: &mut W, entries: &Entries) -> io::Result<()> { fn write_all<W: Write>(&self, f: &mut W, entries: &Entries) -> io::Result<()> {
use itertools::Either;
#[derive(serde::Serialize)] #[derive(serde::Serialize)]
struct SerdeEntries<'a> { struct SerdeEntries<'a> {
errors: &'a Vec<&'a ScanError<'a>>, errors: &'a Vec<&'a ScanError<'a>>,

View File

@ -20,6 +20,7 @@
use std::io::{stdout, BufWriter, Write}; use std::io::{stdout, BufWriter, Write};
use std::process::exit; use std::process::exit;
use cfg_if::cfg_if;
use clap::Clap; use clap::Clap;
use fif::files::{scan_directory, scan_from_walkdir}; use fif::files::{scan_directory, scan_from_walkdir};
use fif::formats::Format; use fif::formats::Format;
@ -87,7 +88,28 @@ fn main() {
trace!("Found {} items to check", entries.len()); trace!("Found {} items to check", entries.len());
let results: Vec<_> = scan_from_walkdir(&entries, args.canonical_paths) cfg_if! {
if #[cfg(feature = "multi-threaded")] {
let use_threads = args.jobs != 1;
if use_threads {
// 0 is a special case - it should be understood to mean "all available host CPUs"
let jobs = if args.jobs == 0 { num_cpus::get() } else { args.jobs };
// set up the global thread pool with the requested number of threads
rayon::ThreadPoolBuilder::new().num_threads(jobs).build_global().unwrap();
trace!("Multithreading enabled, using {} threads", jobs);
} else {
trace!("Multithreading disabled at runtime");
}
} else { // `multi-threading` feature disabled
let use_threads = false;
trace!("Multithreading disabled at compile time");
}
}
let results: Vec<_> = scan_from_walkdir(&entries, args.canonical_paths, use_threads)
.into_iter() .into_iter()
.filter( .filter(
|result| result.is_err() || !result.as_ref().unwrap().valid, |result| result.is_err() || !result.as_ref().unwrap().valid,

View File

@ -133,6 +133,14 @@ pub struct Parameters {
/// For example, with this option, fif will not rename "image.unknown" to "image.jpg" /// For example, with this option, fif will not rename "image.unknown" to "image.jpg"
#[clap(short = 'I', long)] #[clap(short = 'I', long)]
pub ignore_unknown_exts: bool, pub ignore_unknown_exts: bool,
#[cfg(feature = "multi-threaded")]
/// Number of jobs (threads) to use when scanning results.
/// The default behaviour is to use one thread per CPU thread. This behaviour can be manually requested by setting
/// `-j 0`. Using `-j 1` will disable multi-threading behaviour, as if you had compiled fif with the multi-threading
/// feature disabled. Setting more jobs than you have CPU threads is not recommended.
#[clap(short = 'j', long, default_value = "0")]
pub jobs: usize,
} }
fn lowercase_exts(exts: &str) -> Result<(), String> { fn lowercase_exts(exts: &str) -> Result<(), String> {

View File

@ -114,8 +114,10 @@ fn simple_directory() {
// there should be one file missing: "ignore.fake_ext" // there should be one file missing: "ignore.fake_ext"
assert_eq!(entries.len(), files.len() - 1); assert_eq!(entries.len(), files.len() - 1);
let results = scan_from_walkdir(&entries, false); let use_threads = cfg!(feature = "multi-threaded");
let canonical_results = scan_from_walkdir(&entries, true);
let results = scan_from_walkdir(&entries, false, use_threads);
let canonical_results = scan_from_walkdir(&entries, true, use_threads);
assert_eq!(results.len(), canonical_results.len()); assert_eq!(results.len(), canonical_results.len());
for (result, canonical_result) in results.iter().zip(canonical_results.iter()) { for (result, canonical_result) in results.iter().zip(canonical_results.iter()) {
@ -291,6 +293,8 @@ fn rejects_bad_args() {
vec!["fif", "-X", "pebis"], vec!["fif", "-X", "pebis"],
// `-e` with nothing but commas: // `-e` with nothing but commas:
vec!["fif", "-e", ",,,,,"], vec!["fif", "-e", ",,,,,"],
// `-j` with a negative value:
vec!["fif", "-j", "-1"],
]; ];
for test in &tests { for test in &tests {