diff --git a/Cargo.toml b/Cargo.toml index 5d9adee..c40765e 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -20,6 +20,7 @@ build = "build.rs" [dependencies] rand = "0.3" rustc-serialize = "0.3" +crossbeam = "0.2.9" [build-dependencies] gcc = "0.3" diff --git a/changelog.md b/changelog.md index 628c21f..e02b8cb 100644 --- a/changelog.md +++ b/changelog.md @@ -1,4 +1,10 @@ # Changelog + +## [unreleased][unreleased] +### Added +- factorization machines +- parallel fitting and prediction for one-vs-rest models + ## [0.3.1][2016-03-01] ### Changed - NonzeroIterable now takes &self diff --git a/readme.md b/readme.md index a4f9355..5126173 100644 --- a/readme.md +++ b/readme.md @@ -9,7 +9,7 @@ For full usage details, see the [API documentation](https://maciejkula.github.io ## Introduction -This crate is mostly an excuse for me to learn Rust. Nevertheless, it contains reasonably effective +This crate contains reasonably effective implementations of a number of common machine learning algorithms. At the moment, `rustlearn` uses its own basic dense and sparse array types, but I will be happy @@ -43,6 +43,10 @@ should be roughly competitive with Python `sklearn` implementations, both in acc - [accuracy](https://maciejkula.github.io/rustlearn/doc/rustlearn/metrics/fn.accuracy_score.html) - [ROC AUC score](https://maciejkula.github.io/rustlearn/doc/rustlearn/metrics/ranking/fn.roc_auc_score.html) +## Parallelization + +A number of models support both parallel model fitting and prediction. + ### Model serialization Model serialization is supported via `rustc_serialize`. This will probably change to `serde` once compiler plugins land in stable. diff --git a/src/array/dense.rs b/src/array/dense.rs index 7ba48fe..a41d1cc 100644 --- a/src/array/dense.rs +++ b/src/array/dense.rs @@ -111,6 +111,7 @@ use std::iter::Iterator; +use std::ops::Range; use array::traits::*; @@ -144,6 +145,7 @@ pub struct ArrayView<'a> { /// Iterator over row or column views of a dense matrix. pub struct ArrayIterator<'a> { + stop: usize, idx: usize, axis: ArrayIteratorAxis, array: &'a Array, @@ -155,12 +157,7 @@ impl<'a> Iterator for ArrayIterator<'a> { fn next(&mut self) -> Option> { - let bound = match self.axis { - ArrayIteratorAxis::Row => self.array.rows, - ArrayIteratorAxis::Column => self.array.cols, - }; - - let result = if self.idx < bound { + let result = if self.idx < self.stop { Some(ArrayView { idx: self.idx, axis: self.axis, @@ -182,6 +179,7 @@ impl<'a> RowIterable for &'a Array { type Output = ArrayIterator<'a>; fn iter_rows(self) -> ArrayIterator<'a> { ArrayIterator { + stop: self.rows(), idx: 0, axis: ArrayIteratorAxis::Row, array: self, @@ -196,6 +194,21 @@ impl<'a> RowIterable for &'a Array { array: self, } } + + fn iter_rows_range(self, range: Range) -> ArrayIterator<'a> { + let stop = if range.end > self.rows { + self.rows + } else { + range.end + }; + + ArrayIterator { + stop: stop, + idx: range.start, + axis: ArrayIteratorAxis::Row, + array: self, + } + } } @@ -204,6 +217,7 @@ impl<'a> ColumnIterable for &'a Array { type Output = ArrayIterator<'a>; fn iter_columns(self) -> ArrayIterator<'a> { ArrayIterator { + stop: self.cols(), idx: 0, axis: ArrayIteratorAxis::Column, array: self, @@ -218,6 +232,21 @@ impl<'a> ColumnIterable for &'a Array { array: self, } } + + fn iter_columns_range(self, range: Range) -> ArrayIterator<'a> { + let stop = if range.end > self.cols { + self.cols + } else { + range.end + }; + + ArrayIterator { + stop: stop, + idx: range.start, + axis: ArrayIteratorAxis::Column, + array: self, + } + } } @@ -1018,4 +1047,21 @@ mod tests { } } } + + use datasets::iris; + + #[test] + fn range_iteration() { + let (data, _) = iris::load_data(); + + let (start, stop) = (5, 10); + + for (row_num, row) in data.iter_rows_range(start..stop).enumerate() { + for (col_idx, value) in row.iter_nonzero() { + assert!(value == data.get(start + row_num, col_idx)); + } + + assert!(row_num < (stop - start)); + } + } } diff --git a/src/array/sparse.rs b/src/array/sparse.rs index ed0aa63..e81d759 100644 --- a/src/array/sparse.rs +++ b/src/array/sparse.rs @@ -37,6 +37,7 @@ //! //! ``` use std::iter::Iterator; +use std::ops::Range; use array::dense::*; use array::traits::*; @@ -79,8 +80,8 @@ pub struct SparseArrayViewIterator<'a> { /// Iterator over row or column views of a sparse matrix. pub struct SparseArrayIterator<'a> { + stop: usize, idx: usize, - dim: usize, indices: &'a Vec>, data: &'a Vec>, } @@ -290,7 +291,22 @@ impl<'a> RowIterable for &'a SparseRowArray { fn iter_rows(self) -> SparseArrayIterator<'a> { SparseArrayIterator { idx: 0, - dim: self.rows, + stop: self.rows, + indices: &self.indices, + data: &self.data, + } + } + + fn iter_rows_range(self, range: Range) -> SparseArrayIterator<'a> { + let stop = if range.end > self.rows { + self.rows + } else { + range.end + }; + + SparseArrayIterator { + stop: stop, + idx: range.start, indices: &self.indices, data: &self.data, } @@ -388,11 +404,27 @@ impl<'a> ColumnIterable for &'a SparseColumnArray { fn iter_columns(self) -> SparseArrayIterator<'a> { SparseArrayIterator { idx: 0, - dim: self.cols, + stop: self.cols, + indices: &self.indices, + data: &self.data, + } + } + + fn iter_columns_range(self, range: Range) -> SparseArrayIterator<'a> { + let stop = if range.end > self.cols { + self.cols + } else { + range.end + }; + + SparseArrayIterator { + stop: stop, + idx: range.start, indices: &self.indices, data: &self.data, } } + fn view_column(self, idx: usize) -> SparseArrayView<'a> { SparseArrayView { indices: &self.indices[idx], @@ -457,7 +489,7 @@ impl<'a> Iterator for SparseArrayIterator<'a> { fn next(&mut self) -> Option> { - let result = if self.idx < self.dim { + let result = if self.idx < self.stop { Some(SparseArrayView { indices: &self.indices[self.idx][..], data: &self.data[self.idx][..], @@ -595,4 +627,33 @@ mod tests { &dense_arr.get_rows(&vec![1, 0]))); assert!(allclose(&arr.get_rows(&(..)).todense(), &dense_arr.get_rows(&(..)))); } + + use datasets::iris; + + #[test] + fn range_iteration() { + let (data, _) = iris::load_data(); + + let (start, stop) = (5, 10); + + let data = SparseRowArray::from(&data); + + for (row_num, row) in data.iter_rows_range(start..stop).enumerate() { + for (col_idx, value) in row.iter_nonzero() { + assert!(value == data.get(start + row_num, col_idx)); + } + + assert!(row_num < (stop - start)); + } + + let (start, stop) = (1, 3); + + let data = SparseColumnArray::from(&data); + + for (col_num, col) in data.iter_columns_range(start..stop).enumerate() { + for (row_idx, value) in col.iter_nonzero() { + assert!(value == data.get(row_idx, start + col_num)); + } + } + } } diff --git a/src/array/traits.rs b/src/array/traits.rs index 1e17b04..9926719 100644 --- a/src/array/traits.rs +++ b/src/array/traits.rs @@ -76,6 +76,8 @@ pub trait RowIterable { type Output: Iterator; /// Iterate over rows of the matrix. fn iter_rows(self) -> Self::Output; + /// Iterate over a subset of rows of the matrix. + fn iter_rows_range(self, range: Range) -> Self::Output; /// View a row of the matrix. fn view_row(self, idx: usize) -> Self::Item; } @@ -88,6 +90,8 @@ pub trait ColumnIterable { type Output: Iterator; /// Iterate over columns of a the matrix. fn iter_columns(self) -> Self::Output; + /// Iterate over a subset of columns of the matrix. + fn iter_columns_range(self, range: Range) -> Self::Output; /// View a column of the matrix. fn view_column(self, idx: usize) -> Self::Item; } diff --git a/src/datasets/mod.rs b/src/datasets/mod.rs index fb3fd6b..b723b46 100644 --- a/src/datasets/mod.rs +++ b/src/datasets/mod.rs @@ -3,5 +3,5 @@ pub mod iris; #[cfg(test)] -#[cfg(feature = "all_tests")] +#[cfg(any(feature = "all_tests", feature = "bench"))] pub mod newsgroups; diff --git a/src/ensemble/random_forest.rs b/src/ensemble/random_forest.rs index 4717228..41c9edd 100644 --- a/src/ensemble/random_forest.rs +++ b/src/ensemble/random_forest.rs @@ -107,15 +107,14 @@ impl Hyperparameters { } -#[derive(RustcEncodable, RustcDecodable)] -#[derive(Clone)] +#[derive(RustcEncodable, RustcDecodable, Clone)] pub struct RandomForest { trees: Vec, rng: EncodableRng, } -impl SupervisedModel for RandomForest { +impl<'a> SupervisedModel<&'a Array> for RandomForest { fn fit(&mut self, X: &Array, y: &Array) -> Result<(), &'static str> { let mut rng = self.rng.clone(); @@ -145,7 +144,7 @@ impl SupervisedModel for RandomForest { } -impl SupervisedModel for RandomForest { +impl<'a> SupervisedModel<&'a SparseRowArray> for RandomForest { fn fit(&mut self, X: &SparseRowArray, y: &Array) -> Result<(), &'static str> { let mut rng = self.rng.clone(); @@ -253,6 +252,47 @@ mod tests { assert!(test_accuracy > 0.96); } + #[test] + fn test_random_forest_iris_parallel() { + let (data, target) = load_data(); + + let mut test_accuracy = 0.0; + + let no_splits = 10; + + let mut cv = CrossValidation::new(data.rows(), no_splits); + cv.set_rng(StdRng::from_seed(&[100])); + + for (train_idx, test_idx) in cv { + + let x_train = data.get_rows(&train_idx); + let x_test = data.get_rows(&test_idx); + + let y_train = target.get_rows(&train_idx); + + let mut tree_params = decision_tree::Hyperparameters::new(data.cols()); + tree_params.min_samples_split(10) + .max_features(4) + .rng(StdRng::from_seed(&[100])); + + let mut model = Hyperparameters::new(tree_params, 10) + .rng(StdRng::from_seed(&[100])) + .one_vs_rest(); + + model.fit_parallel(&x_train, &y_train, 2).unwrap(); + + let test_prediction = model.predict_parallel(&x_test, 2).unwrap(); + + test_accuracy += accuracy_score(&target.get_rows(&test_idx), &test_prediction); + } + + test_accuracy /= no_splits as f32; + + println!("Accuracy {}", test_accuracy); + + assert!(test_accuracy > 0.96); + } + #[test] fn test_random_forest_iris_sparse() { let (data, target) = load_data(); diff --git a/src/factorization/factorization_machines.rs b/src/factorization/factorization_machines.rs index 59bcc47..4ff9873 100644 --- a/src/factorization/factorization_machines.rs +++ b/src/factorization/factorization_machines.rs @@ -10,6 +10,11 @@ //! a higher setting will make the model more expressive at the expense of training time and //! risk of overfitting. //! +//! # Parallelism +//! +//! The model supports multithreaded model fitting via asynchronous stochastic +//! gradient descent (Hogwild). +//! //! # Examples //! //! ``` @@ -28,6 +33,8 @@ //! ``` #![allow(non_snake_case)] +use std::cmp; + use prelude::*; use multiclass::OneVsRestWrapper; @@ -36,6 +43,9 @@ use utils::{check_data_dimensionality, check_matched_dimensions, check_valid_lab use rand; use rand::distributions::IndependentSample; +use crossbeam; + + fn sigmoid(x: f32) -> f32 { 1.0 / (1.0 + (-x).exp()) } @@ -344,6 +354,10 @@ impl FactorizationMachine { /// Perform a dummy update pass over all features to force regularization to be applied. fn regularize_all(&mut self) { + if self.l1_penalty == 0.0 && self.l2_penalty == 0.0 { + return; + } + let array = Array::ones(1, self.dim); let num_components = self.num_components; @@ -365,8 +379,11 @@ impl FactorizationMachine { } -impl SupervisedModel for FactorizationMachine { - fn fit(&mut self, X: &Array, y: &Array) -> Result<(), &'static str> { +impl<'a, T> SupervisedModel<&'a T> for FactorizationMachine + where &'a T: RowIterable, + T: IndexableMatrix +{ + fn fit(&mut self, X: &'a T, y: &Array) -> Result<(), &'static str> { try!(check_data_dimensionality(self.dim, X)); try!(check_matched_dimensions(X, y)); @@ -375,7 +392,7 @@ impl SupervisedModel for FactorizationMachine { self.fit_sigmoid(X, y) } - fn decision_function(&self, X: &Array) -> Result { + fn decision_function(&self, X: &'a T) -> Result { try!(check_data_dimensionality(self.dim, X)); @@ -393,30 +410,53 @@ impl SupervisedModel for FactorizationMachine { } -impl SupervisedModel for FactorizationMachine { - fn fit(&mut self, X: &SparseRowArray, y: &Array) -> Result<(), &'static str> { +impl<'a, T> ParallelSupervisedModel<&'a T> for FactorizationMachine + where &'a T: RowIterable, + T: IndexableMatrix + Sync +{ + fn fit_parallel(&mut self, + X: &'a T, + y: &Array, + num_threads: usize) + -> Result<(), &'static str> { try!(check_data_dimensionality(self.dim, X)); try!(check_matched_dimensions(X, y)); try!(check_valid_labels(y)); - self.fit_sigmoid(X, y) - } + let rows_per_thread = X.rows() / num_threads + 1; + let num_components = self.num_components; - fn decision_function(&self, X: &SparseRowArray) -> Result { + let model_ptr = unsafe { &*(self as *const FactorizationMachine) }; - try!(check_data_dimensionality(self.dim, X)); + crossbeam::scope(|scope| { + for thread_num in 0..num_threads { + scope.spawn(move || { - let mut data = Vec::with_capacity(X.rows()); + let start = thread_num * rows_per_thread; + let stop = cmp::min((thread_num + 1) * rows_per_thread, X.rows()); - let mut component_sum = &mut vec![0.0; self.num_components][..]; + let mut component_sum = vec![0.0; num_components]; - for row in X.iter_rows() { - let prediction = self.compute_prediction(&row, component_sum); - data.push(sigmoid(prediction)); - } + let model = unsafe { + &mut *(model_ptr as *const FactorizationMachine + as *mut FactorizationMachine) + }; - Ok(Array::from(data)) + for (row, &true_y) in X.iter_rows_range(start..stop) + .zip(y.data()[start..stop].iter()) { + let y_hat = sigmoid(model.compute_prediction(&row, &mut component_sum[..])); + let loss = logistic_loss(true_y, y_hat); + model.update(row, loss, &mut component_sum[..]); + model.accumulate_regularization(); + } + }); + } + }); + + self.regularize_all(); + + Ok(()) } } @@ -430,6 +470,7 @@ mod tests { use cross_validation::cross_validation::CrossValidation; use datasets::iris::load_data; use metrics::accuracy_score; + use multiclass::OneVsRest; #[cfg(feature = "all_tests")] use datasets::newsgroups; @@ -546,6 +587,56 @@ mod tests { assert!(test_accuracy > 0.94); } + #[test] + fn test_iris_parallel() { + let (data, target) = load_data(); + + // Get a binary target so that the parallelism + // goes through the FM model and not through the + // OvR wrapper. + let (_, target) = OneVsRest::split(&target).next().unwrap(); + + let mut test_accuracy = 0.0; + let mut train_accuracy = 0.0; + + let no_splits = 10; + + let mut cv = CrossValidation::new(data.rows(), no_splits); + cv.set_rng(StdRng::from_seed(&[100])); + + for (train_idx, test_idx) in cv { + + let x_train = data.get_rows(&train_idx); + let x_test = data.get_rows(&test_idx); + + let y_train = target.get_rows(&train_idx); + + let mut model = Hyperparameters::new(data.cols(), 5) + .learning_rate(0.05) + .l2_penalty(0.0) + .rng(StdRng::from_seed(&[100])) + .build(); + + for _ in 0..20 { + model.fit_parallel(&x_train, &y_train, 4).unwrap(); + } + + let y_hat = model.predict(&x_test).unwrap(); + let y_hat_train = model.predict(&x_train).unwrap(); + + test_accuracy += accuracy_score(&target.get_rows(&test_idx), &y_hat); + train_accuracy += accuracy_score(&target.get_rows(&train_idx), &y_hat_train); + } + + test_accuracy /= no_splits as f32; + train_accuracy /= no_splits as f32; + + println!("Accuracy {}", test_accuracy); + println!("Train accuracy {}", train_accuracy); + + assert!(test_accuracy > 0.94); + } + #[test] #[cfg(feature = "all_tests")] fn test_fm_newsgroups() { @@ -603,12 +694,13 @@ mod bench { use test::Bencher; - use rustlearn::prelude::*; + use prelude::*; - use rustlearn::cross_validation::cross_validation::CrossValidation; - use rustlearn::datasets::iris::load_data; - use rustlearn::metrics::{accuracy_score, roc_auc_score}; - use rustlearn::multiclass::OneVsRest; + use cross_validation::cross_validation::CrossValidation; + use datasets::iris::load_data; + use datasets::newsgroups; + use metrics::{accuracy_score, roc_auc_score}; + use multiclass::OneVsRest; use super::*; @@ -629,4 +721,40 @@ mod bench { model.fit(&sparse_data, &target).unwrap(); }); } + + #[bench] + fn bench_fm_newsgroups(b: &mut Bencher) { + + let (X, target) = newsgroups::load_data(); + let (_, target) = OneVsRest::split(&target).next().unwrap(); + + let X = X.get_rows(&(..500)); + let target = target.get_rows(&(..500)); + + let mut model = Hyperparameters::new(X.cols(), 10) + .rng(StdRng::from_seed(&[100])) + .build(); + + b.iter(|| { + model.fit(&X, &target).unwrap(); + }); + } + + #[bench] + fn bench_fm_newsgroups_parallel(b: &mut Bencher) { + + let (X, target) = newsgroups::load_data(); + let (_, target) = OneVsRest::split(&target).next().unwrap(); + + let X = X.get_rows(&(..500)); + let target = target.get_rows(&(..500)); + + let mut model = Hyperparameters::new(X.cols(), 10) + .rng(StdRng::from_seed(&[100])) + .build(); + + b.iter(|| { + model.fit_parallel(&X, &target, 2).unwrap(); + }); + } } diff --git a/src/lib.rs b/src/lib.rs index b69a61c..2f2acd7 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -3,8 +3,8 @@ //! //! # Introduction //! -//! This crate is mostly an excuse for me to learn Rust. Nevertheless, it contains -//! reasonably effective implementations of a number of common machine learing algorithms. +//! This crate contains reasonably effective implementations +//! of a number of common machine learing algorithms. //! //! At the moment, `rustlearn` uses its own basic dense and sparse array types, but I will be happy //! to use something more robust once a clear winner in that space emerges. @@ -37,6 +37,10 @@ //! - [accuracy](metrics/fn.accuracy_score.html) //! - [ROC AUC score](metrics/ranking/fn.roc_auc_score.html) //! +//! ## Parallelization +//! +//! A number of models support both parallel model fitting and prediction. +//! //! ## Model serialization //! //! Model serialization is supported via `rustc_serialize`. This will probably change to `serde` once @@ -150,6 +154,7 @@ extern crate csv; extern crate rand; extern crate rustc_serialize; +extern crate crossbeam; pub mod array; diff --git a/src/linear_models/sgdclassifier.rs b/src/linear_models/sgdclassifier.rs index 073c032..8715fe9 100644 --- a/src/linear_models/sgdclassifier.rs +++ b/src/linear_models/sgdclassifier.rs @@ -57,6 +57,8 @@ use std::iter::Iterator; +use crossbeam; + use prelude::*; use multiclass::OneVsRestWrapper; @@ -199,7 +201,7 @@ macro_rules! min { } -impl SupervisedModel for SGDClassifier { +impl<'a> SupervisedModel<&'a Array> for SGDClassifier { fn fit(&mut self, X: &Array, y: &Array) -> Result<(), &'static str> { try!(check_data_dimensionality(self.dim, X)); @@ -234,7 +236,42 @@ impl SupervisedModel for SGDClassifier { } -impl SupervisedModel for SGDClassifier { +impl<'a> ParallelPredict<&'a Array> for SGDClassifier { + fn decision_function_parallel(&self, + X: &Array, + num_threads: usize) + -> Result { + try!(check_data_dimensionality(self.dim, X)); + + let mut data = Vec::with_capacity(X.rows()); + + crossbeam::scope(|scope| { + + let data_chunks = data.chunks_mut(num_threads).collect::>(); + let mut row_bounds = Vec::new(); + + let mut chunk_start = 0; + + for chunk in &data_chunks { + row_bounds.push((chunk_start, chunk_start + chunk.len())); + chunk_start += chunk.len(); + } + + for (&(start, stop), out_chunk) in row_bounds.iter().zip(data_chunks) { + scope.spawn(move || { + for (row_number, out) in (start..stop).zip(out_chunk) { + *out = self.compute_prediction(&X.view_row(row_number)); + } + }); + } + }); + + Ok(Array::from(data)) + } +} + + +impl<'a> SupervisedModel<&'a SparseRowArray> for SGDClassifier { fn fit(&mut self, X: &SparseRowArray, y: &Array) -> Result<(), &'static str> { try!(check_data_dimensionality(self.dim, X)); diff --git a/src/multiclass/mod.rs b/src/multiclass/mod.rs index a2f9eaf..3a9ba23 100644 --- a/src/multiclass/mod.rs +++ b/src/multiclass/mod.rs @@ -10,6 +10,8 @@ use array::traits::*; use traits::*; +use crossbeam; + pub struct OneVsRest<'a> { y: &'a Array, @@ -86,6 +88,7 @@ impl<'a> Iterator for OneVsRest<'a> { } } + /// Wraps simple two-class classifiers to implement one-vs-rest strategies. #[derive(RustcEncodable, RustcDecodable)] pub struct OneVsRestWrapper { @@ -117,6 +120,24 @@ impl OneVsRestWrapper { &mut self.models[self.class_labels.len() - 1] } + fn extract_model(&mut self, class_label: f32) -> T { + + let mut model_idx = None; + + for (idx, label) in self.class_labels.iter().enumerate() { + if let Some(Ordering::Equal) = class_label.partial_cmp(label) { + model_idx = Some(idx); + } + } + + if let Some(idx) = model_idx { + self.class_labels.remove(idx); + return self.models.remove(idx); + } + + self.base_model.clone() + } + pub fn models(&self) -> &Vec { &self.models } @@ -127,151 +148,177 @@ impl OneVsRestWrapper { } -impl + Clone> SupervisedModel for OneVsRestWrapper { - fn fit(&mut self, X: &Array, y: &Array) -> Result<(), &'static str> { +macro_rules! impl_multiclass_supervised_model { + ($t:ty) => { + impl<'a, T: SupervisedModel<&'a $t> + Clone> SupervisedModel<&'a $t> for OneVsRestWrapper { + fn fit(&mut self, X: &'a $t, y: &Array) -> Result<(), &'static str> { - for (class_label, binary_target) in OneVsRest::split(y) { + for (class_label, binary_target) in OneVsRest::split(y) { - let model = self.get_model(class_label); - try!(model.fit(X, &binary_target)); - } - Ok(()) - } + let model = self.get_model(class_label); + try!(model.fit(X, &binary_target)); + } + Ok(()) + } - fn decision_function(&self, X: &Array) -> Result { + fn decision_function(&self, X: &'a $t) -> Result { - let mut out = Array::zeros(X.rows(), self.class_labels.len()); + let mut out = Array::zeros(X.rows(), self.class_labels.len()); - for (col_idx, model) in self.models.iter().enumerate() { - let values = try!(model.decision_function(X)); - for (row_idx, &val) in values.data().iter().enumerate() { - *out.get_mut(row_idx, col_idx) = val; + for (col_idx, model) in self.models.iter().enumerate() { + let values = try!(model.decision_function(X)); + for (row_idx, &val) in values.data().iter().enumerate() { + *out.get_mut(row_idx, col_idx) = val; + } + } + + Ok(out) } - } - Ok(out) - } + fn predict(&self, X: &'a $t) -> Result { - fn predict(&self, X: &Array) -> Result { + let decision = try!(self.decision_function(X)); + let mut predictions = Vec::with_capacity(X.rows()); - let decision = try!(self.decision_function(X)); - let mut predictions = Vec::with_capacity(X.rows()); + for row in decision.iter_rows() { - for row in decision.iter_rows() { + let mut max_value = f32::NEG_INFINITY; + let mut max_class = 0; - let mut max_value = f32::NEG_INFINITY; - let mut max_class = 0; + for (class_idx, val) in row.iter_nonzero() { + if val > max_value { + max_value = val; + max_class = class_idx; + } + } - for (class_idx, val) in row.iter_nonzero() { - if val > max_value { - max_value = val; - max_class = class_idx; + predictions.push(self.class_labels[max_class]); } - } - predictions.push(self.class_labels[max_class]); + Ok(Array::from(predictions)) + } } - - Ok(Array::from(predictions)) - } + }; } -impl SupervisedModel for OneVsRestWrapper - where T: SupervisedModel + Clone -{ - fn fit(&mut self, X: &SparseRowArray, y: &Array) -> Result<(), &'static str> { - for (class_label, binary_target) in OneVsRest::split(y) { - let model = self.get_model(class_label); - try!(model.fit(X, &binary_target)); - } - Ok(()) - } - - fn decision_function(&self, X: &SparseRowArray) -> Result { +macro_rules! impl_multiclass_parallel_predict { + ($t:ty) => { + impl<'a, T: SupervisedModel<&'a $t> + Clone + Sync> ParallelPredict<&'a $t> for OneVsRestWrapper { + fn decision_function_parallel(&self, X: &'a $t, num_threads: usize) + -> Result { - let mut out = Array::zeros(X.rows(), self.class_labels.len()); - - for (col_idx, model) in self.models.iter().enumerate() { - let values = try!(model.decision_function(X)); - for (row_idx, &val) in values.data().iter().enumerate() { - *out.get_mut(row_idx, col_idx) = val; - } - } - - Ok(out) - } + let mut out = Array::zeros(X.rows(), self.class_labels.len()); - fn predict(&self, X: &SparseRowArray) -> Result { + let numbered_models = self.models.iter().enumerate().collect::>(); - let decision = try!(self.decision_function(X)); - let mut predictions = Vec::with_capacity(X.rows()); + for slc in numbered_models.chunks(num_threads) { - for row in decision.iter_rows() { + let mut guards = Vec::new(); - let mut max_value = f32::NEG_INFINITY; - let mut max_class = 0; + crossbeam::scope(|scope| { + for &(col_idx, model) in slc { + guards.push(scope.spawn(move || { + (col_idx, model.decision_function(X)) + })); + } + }); - for (class_idx, val) in row.iter_nonzero() { - if val > max_value { - max_value = val; - max_class = class_idx; + for guard in guards.into_iter() { + let (col_idx, res) = guard.join(); + if res.is_ok() { + for (row_idx, &value) in res.unwrap().as_slice().iter().enumerate() { + out.set(row_idx, col_idx, value); + } + } else { + return res; + } + } } + + Ok(out) } - predictions.push(self.class_labels[max_class]); - } + fn predict_parallel(&self, X: &'a $t, num_threads: usize) -> Result { - Ok(Array::from(predictions)) - } -} + let decision = try!(self.decision_function_parallel(X, num_threads)); + let mut predictions = Vec::with_capacity(X.rows()); + for row in decision.iter_rows() { -impl SupervisedModel for OneVsRestWrapper - where T: SupervisedModel + Clone -{ - fn fit(&mut self, X: &SparseColumnArray, y: &Array) -> Result<(), &'static str> { - for (class_label, binary_target) in OneVsRest::split(y) { - let model = self.get_model(class_label); - try!(model.fit(X, &binary_target)); - } - Ok(()) - } + let mut max_value = f32::NEG_INFINITY; + let mut max_class = 0; - fn decision_function(&self, X: &SparseColumnArray) -> Result { + for (class_idx, val) in row.iter_nonzero() { + if val > max_value { + max_value = val; + max_class = class_idx; + } + } - let mut out = Array::zeros(X.rows(), self.class_labels.len()); + predictions.push(self.class_labels[max_class]); + } - for (col_idx, model) in self.models.iter().enumerate() { - let values = try!(model.decision_function(X)); - for (row_idx, &val) in values.data().iter().enumerate() { - *out.get_mut(row_idx, col_idx) = val; + Ok(Array::from(predictions)) } } + }; +} - Ok(out) - } - fn predict(&self, X: &SparseColumnArray) -> Result { +macro_rules! impl_multiclass_parallel_supervised { + ($t:ty) => { + impl<'a, T: SupervisedModel<&'a $t> + Clone + Sync + Send> ParallelSupervisedModel<&'a $t> for OneVsRestWrapper { + fn fit_parallel(&mut self, X: &'a $t, y: &Array, num_threads: usize) -> Result<(), &'static str> { - let decision = try!(self.decision_function(X)); - let mut predictions = Vec::with_capacity(X.rows()); + let mut ovr = OneVsRest::split(y); - for row in decision.iter_rows() { + loop { + let chunk = ovr.by_ref().take(num_threads).collect::>(); - let mut max_value = f32::NEG_INFINITY; - let mut max_class = 0; + if chunk.len() == 0 { + break; + } - for (class_idx, val) in row.iter_nonzero() { - if val > max_value { - max_value = val; - max_class = class_idx; + let mut guards = Vec::new(); + + crossbeam::scope(|scope| { + for (class_label, binary_target) in chunk { + let mut model = self.extract_model(class_label); + guards.push(scope.spawn(move || { + let result = model.fit(X, &binary_target); + (class_label, model, result) + })); + } + }); + + for guard in guards.into_iter() { + let (class_label, model, result) = guard.join(); + + if result.is_ok() { + self.class_labels.push(class_label); + self.models.push(model); + } else { + return result; + } + } } - } - predictions.push(self.class_labels[max_class]); + Ok(()) + } } - - Ok(Array::from(predictions)) } } + + +impl_multiclass_supervised_model!(Array); +impl_multiclass_supervised_model!(SparseRowArray); +impl_multiclass_supervised_model!(SparseColumnArray); + +impl_multiclass_parallel_predict!(Array); +impl_multiclass_parallel_predict!(SparseRowArray); +impl_multiclass_parallel_predict!(SparseColumnArray); + +impl_multiclass_parallel_supervised!(Array); +impl_multiclass_parallel_supervised!(SparseRowArray); +impl_multiclass_parallel_supervised!(SparseColumnArray); diff --git a/src/svm/libsvm/svc.rs b/src/svm/libsvm/svc.rs index 76af4d3..d4cb421 100644 --- a/src/svm/libsvm/svc.rs +++ b/src/svm/libsvm/svc.rs @@ -93,7 +93,7 @@ pub struct SVC { macro_rules! impl_supervised_model { ($x_type:ty) => { - impl SupervisedModel<$x_type> for SVC { + impl<'a> SupervisedModel<&'a $x_type> for SVC { fn fit(&mut self, X: &$x_type, y: &Array) -> Result<(), &'static str> { try!(check_data_dimensionality(self.dim, X)); diff --git a/src/traits/mod.rs b/src/traits/mod.rs index b47bc70..c2dff15 100644 --- a/src/traits/mod.rs +++ b/src/traits/mod.rs @@ -5,10 +5,11 @@ use std::cmp::Ordering; use array::dense::*; +/// Trait describing supervised models. pub trait SupervisedModel { - fn fit(&mut self, X: &T, y: &Array) -> Result<(), &'static str>; - fn decision_function(&self, X: &T) -> Result; - fn predict(&self, x: &T) -> Result { + fn fit(&mut self, X: T, y: &Array) -> Result<(), &'static str>; + fn decision_function(&self, X: T) -> Result; + fn predict(&self, x: T) -> Result { let decision_func = try!(self.decision_function(x)); @@ -23,3 +24,29 @@ pub trait SupervisedModel { .collect::>())) } } + + +/// Applies to models capable of making predictions in a parallel fashion. +pub trait ParallelPredict { + fn decision_function_parallel(&self, X: T, num_threads: usize) -> Result; + fn predict_parallel(&self, X: T, num_threads: usize) -> Result { + + let decision_func = try!(self.decision_function_parallel(X, num_threads)); + + Ok(Array::from(decision_func.data() + .iter() + .map(|v| { + match v.partial_cmp(&0.5) { + Some(Ordering::Greater) => 1.0, + _ => 0.0, + } + }) + .collect::>())) + } +} + + +/// Applies to models capable of being trained in a parallel fashion. +pub trait ParallelSupervisedModel { + fn fit_parallel(&mut self, X: T, y: &Array, num_threads: usize) -> Result<(), &'static str>; +} diff --git a/src/trees/decision_tree.rs b/src/trees/decision_tree.rs index aa3c627..219a846 100644 --- a/src/trees/decision_tree.rs +++ b/src/trees/decision_tree.rs @@ -321,7 +321,7 @@ pub struct DecisionTree { } -impl SupervisedModel for DecisionTree { +impl<'a> SupervisedModel<&'a Array> for DecisionTree { fn fit(&mut self, X: &Array, y: &Array) -> Result<(), &'static str> { try!(check_data_dimensionality(self.dim, X)); @@ -363,7 +363,7 @@ impl SupervisedModel for DecisionTree { } -impl SupervisedModel for DecisionTree { +impl<'a> SupervisedModel<&'a SparseColumnArray> for DecisionTree { fn fit(&mut self, X: &SparseColumnArray, y: &Array) -> Result<(), &'static str> { try!(check_data_dimensionality(self.dim, X)); @@ -1181,6 +1181,54 @@ mod tests { assert!(train_accuracy > 0.95); } + + #[test] + #[cfg(feature = "all_tests")] + fn test_decision_tree_newsgroups_parallel() { + + let (X, target) = newsgroups::load_data(); + + let no_splits = 2; + + let mut test_accuracy = 0.0; + let mut train_accuracy = 0.0; + + let mut cv = CrossValidation::new(X.rows(), no_splits); + cv.set_rng(StdRng::from_seed(&[100])); + + for (train_idx, test_idx) in cv { + + let x_train = SparseColumnArray::from(&X.get_rows(&train_idx)); + + let x_test = SparseColumnArray::from(&X.get_rows(&test_idx)); + let y_train = target.get_rows(&train_idx); + + let mut model = Hyperparameters::new(X.cols()) + .min_samples_split(5) + .rng(StdRng::from_seed(&[100])) + .one_vs_rest(); + + let start = time::precise_time_ns(); + for _ in 0..2 { + model.fit_parallel(&x_train, &y_train, 2).unwrap(); + } + println!("Elapsed {}", time::precise_time_ns() - start); + + let y_hat = model.predict_parallel(&x_test, 2).unwrap(); + let y_hat_train = model.predict_parallel(&x_train, 2).unwrap(); + + test_accuracy += accuracy_score(&target.get_rows(&test_idx), &y_hat); + + train_accuracy += accuracy_score(&target.get_rows(&train_idx), &y_hat_train); + } + + test_accuracy /= no_splits as f32; + train_accuracy /= no_splits as f32; + println!("{}", test_accuracy); + println!("train accuracy {}", train_accuracy); + + assert!(train_accuracy > 0.95); + } } @@ -1191,6 +1239,7 @@ mod bench { use prelude::*; use datasets::iris::load_data; + use datasets::newsgroups; use super::Hyperparameters; use rand::{Rng, StdRng, SeedableRng}; @@ -1341,6 +1390,7 @@ mod bench { let (X, target) = newsgroups::load_data(); let x_train = SparseColumnArray::from(&X.get_rows(&(..500))); + let target = target.get_rows(&(..500)); let mut model = Hyperparameters::new(X.cols()) .min_samples_split(5) @@ -1351,4 +1401,22 @@ mod bench { model.fit(&x_train, &target).unwrap(); }); } + + #[bench] + fn bench_decision_tree_newsgroups_parallel(b: &mut Bencher) { + + let (X, target) = newsgroups::load_data(); + + let x_train = SparseColumnArray::from(&X.get_rows(&(..500))); + let target = target.get_rows(&(..500)); + + let mut model = Hyperparameters::new(X.cols()) + .min_samples_split(5) + .rng(StdRng::from_seed(&[100])) + .one_vs_rest(); + + b.iter(|| { + model.fit_parallel(&x_train, &target, 2).unwrap(); + }); + } }