Skip to content

Commit

Permalink
add rcpp_flows_aggregate_pairwise fn for #229
Browse files Browse the repository at this point in the history
  • Loading branch information
mpadge committed Jun 5, 2024
1 parent 998352b commit c008b30
Show file tree
Hide file tree
Showing 5 changed files with 121 additions and 2 deletions.
2 changes: 1 addition & 1 deletion DESCRIPTION
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
Package: dodgr
Title: Distances on Directed Graphs
Version: 0.4.0.003
Version: 0.4.0.004
Authors@R: c(
person("Mark", "Padgham", , "mark.padgham@email.com", role = c("aut", "cre")),
person("Andreas", "Petutschnig", role = "aut"),
Expand Down
24 changes: 24 additions & 0 deletions R/RcppExports.R
Original file line number Diff line number Diff line change
Expand Up @@ -90,6 +90,30 @@ rcpp_flows_aggregate_par <- function (graph, vert_map_in, fromi, toi_in, flows,
.Call (`_dodgr_rcpp_flows_aggregate_par`, graph, vert_map_in, fromi, toi_in, flows, norm_sums, tol, heap_type)
}

#' rcpp_flows_aggregate_pairwise
#'
#' Pairwise version of flows_aggregate_par
#'
#' @param graph The data.frame holding the graph edges
#' @param vert_map_in map from <std::string> vertex ID to (0-indexed) integer
#' index of vertices
#' @param fromi Index into vert_map_in of vertex numbers
#' @param toi Index into vert_map_in of vertex numbers
#' @param tol Relative tolerance in terms of flows below which targets
#' (to-vertices) are not considered.
#'
#' @note The parallelisation is achieved by dumping the results of each thread
#' to a file, with aggregation performed at the end by simply reading back and
#' aggregating all files. There is no way to aggregate into a single vector
#' because threads have to be independent. The only danger with this approach
#' is that multiple threads may generate the same file names, but with names 10
#' characters long, that chance should be 1 / 62 ^ 10.
#'
#' @noRd
rcpp_flows_aggregate_pairwise <- function (graph, vert_map_in, fromi, toi, flows, norm_sums, tol, heap_type) {
.Call (`_dodgr_rcpp_flows_aggregate_pairwise`, graph, vert_map_in, fromi, toi, flows, norm_sums, tol, heap_type)
}

#' rcpp_flows_disperse_par
#'
#' Modified version of \code{rcpp_flows_aggregate} that aggregates flows to all
Expand Down
2 changes: 1 addition & 1 deletion codemeta.json
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@
"codeRepository": "/~https://github.com/UrbanAnalyst/dodgr",
"issueTracker": "/~https://github.com/UrbanAnalyst/dodgr/issues",
"license": "https://spdx.org/licenses/GPL-3.0",
"version": "0.4.0.003",
"version": "0.4.0.004",
"programmingLanguage": {
"@type": "ComputerLanguage",
"name": "R",
Expand Down
19 changes: 19 additions & 0 deletions src/RcppExports.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -87,6 +87,24 @@ BEGIN_RCPP
return rcpp_result_gen;
END_RCPP
}
// rcpp_flows_aggregate_pairwise
Rcpp::NumericVector rcpp_flows_aggregate_pairwise(const Rcpp::DataFrame graph, const Rcpp::DataFrame vert_map_in, Rcpp::IntegerVector fromi, Rcpp::IntegerVector toi, Rcpp::NumericVector flows, const bool norm_sums, const double tol, const std::string heap_type);
RcppExport SEXP _dodgr_rcpp_flows_aggregate_pairwise(SEXP graphSEXP, SEXP vert_map_inSEXP, SEXP fromiSEXP, SEXP toiSEXP, SEXP flowsSEXP, SEXP norm_sumsSEXP, SEXP tolSEXP, SEXP heap_typeSEXP) {
BEGIN_RCPP
Rcpp::RObject rcpp_result_gen;
Rcpp::RNGScope rcpp_rngScope_gen;
Rcpp::traits::input_parameter< const Rcpp::DataFrame >::type graph(graphSEXP);
Rcpp::traits::input_parameter< const Rcpp::DataFrame >::type vert_map_in(vert_map_inSEXP);
Rcpp::traits::input_parameter< Rcpp::IntegerVector >::type fromi(fromiSEXP);
Rcpp::traits::input_parameter< Rcpp::IntegerVector >::type toi(toiSEXP);
Rcpp::traits::input_parameter< Rcpp::NumericVector >::type flows(flowsSEXP);
Rcpp::traits::input_parameter< const bool >::type norm_sums(norm_sumsSEXP);
Rcpp::traits::input_parameter< const double >::type tol(tolSEXP);
Rcpp::traits::input_parameter< const std::string >::type heap_type(heap_typeSEXP);
rcpp_result_gen = Rcpp::wrap(rcpp_flows_aggregate_pairwise(graph, vert_map_in, fromi, toi, flows, norm_sums, tol, heap_type));
return rcpp_result_gen;
END_RCPP
}
// rcpp_flows_disperse_par
Rcpp::NumericVector rcpp_flows_disperse_par(const Rcpp::DataFrame graph, const Rcpp::DataFrame vert_map_in, Rcpp::IntegerVector fromi, Rcpp::NumericVector k, Rcpp::NumericVector dens, const double& tol, std::string heap_type);
RcppExport SEXP _dodgr_rcpp_flows_disperse_par(SEXP graphSEXP, SEXP vert_map_inSEXP, SEXP fromiSEXP, SEXP kSEXP, SEXP densSEXP, SEXP tolSEXP, SEXP heap_typeSEXP) {
Expand Down Expand Up @@ -416,6 +434,7 @@ static const R_CallMethodDef CallEntries[] = {
{"_dodgr_rcpp_deduplicate", (DL_FUNC) &_dodgr_rcpp_deduplicate, 5},
{"_dodgr_rcpp_aggregate_to_sf", (DL_FUNC) &_dodgr_rcpp_aggregate_to_sf, 3},
{"_dodgr_rcpp_flows_aggregate_par", (DL_FUNC) &_dodgr_rcpp_flows_aggregate_par, 8},
{"_dodgr_rcpp_flows_aggregate_pairwise", (DL_FUNC) &_dodgr_rcpp_flows_aggregate_pairwise, 8},
{"_dodgr_rcpp_flows_disperse_par", (DL_FUNC) &_dodgr_rcpp_flows_disperse_par, 7},
{"_dodgr_rcpp_flows_si", (DL_FUNC) &_dodgr_rcpp_flows_si, 10},
{"_dodgr_rcpp_fundamental_cycles", (DL_FUNC) &_dodgr_rcpp_fundamental_cycles, 2},
Expand Down
76 changes: 76 additions & 0 deletions src/flows.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -745,6 +745,82 @@ Rcpp::NumericVector rcpp_flows_aggregate_par (const Rcpp::DataFrame graph,
}


//' rcpp_flows_aggregate_pairwise
//'
//' Pairwise version of flows_aggregate_par
//'
//' @param graph The data.frame holding the graph edges
//' @param vert_map_in map from <std::string> vertex ID to (0-indexed) integer
//' index of vertices
//' @param fromi Index into vert_map_in of vertex numbers
//' @param toi Index into vert_map_in of vertex numbers
//' @param tol Relative tolerance in terms of flows below which targets
//' (to-vertices) are not considered.
//'
//' @note The parallelisation is achieved by dumping the results of each thread
//' to a file, with aggregation performed at the end by simply reading back and
//' aggregating all files. There is no way to aggregate into a single vector
//' because threads have to be independent. The only danger with this approach
//' is that multiple threads may generate the same file names, but with names 10
//' characters long, that chance should be 1 / 62 ^ 10.
//'
//' @noRd
// [[Rcpp::export]]
Rcpp::NumericVector rcpp_flows_aggregate_pairwise (const Rcpp::DataFrame graph,
const Rcpp::DataFrame vert_map_in,
Rcpp::IntegerVector fromi,
Rcpp::IntegerVector toi,
Rcpp::NumericVector flows,
const bool norm_sums,
const double tol,
const std::string heap_type)
{
if (fromi.size () != toi.size ())
Rcpp::stop ("pairwise dists must have from.size == to.size");
long int n = fromi.size ();
size_t n_st = static_cast <size_t> (n);

const std::vector <std::string> from = graph ["from"];
const std::vector <std::string> to = graph ["to"];
const std::vector <double> dist = graph ["d"];
const std::vector <double> wt = graph ["d_weighted"];

const size_t nedges = static_cast <size_t> (graph.nrow ());
const std::vector <std::string> vert_name = vert_map_in ["vert"];
const std::vector <size_t> vert_indx = vert_map_in ["id"];
// Make map from vertex name to integer index
std::map <std::string, size_t> vert_map_i;
const size_t nverts = run_sp::make_vert_map (vert_map_in, vert_name,
vert_indx, vert_map_i);

std::unordered_map <std::string, size_t> verts_to_edge_map;
std::unordered_map <std::string, double> verts_to_dist_map;
run_sp::make_vert_to_edge_maps (from, to, wt, verts_to_edge_map, verts_to_dist_map);

std::shared_ptr <DGraph> g = std::make_shared <DGraph> (nverts);
inst_graph (g, nedges, vert_map_i, from, to, dist, wt);

// Paired (fromi, toi) in a single vector
Rcpp::IntegerVector fromto (2 * n_st);
for (int i = 0; i < n; i++)
{
size_t i_t = static_cast <size_t> (i);
fromto [i] = fromi (i_t);
fromto [i + n] = toi (i_t);
}

// Create parallel worker
OneAggregatePaired oneAggregatePaired (RcppParallel::RVector <int> (fromto),
RcppParallel::RVector <double> (flows), vert_name, verts_to_edge_map,
n_st, nverts, nedges, norm_sums, tol, heap_type, g);

size_t chunk_size = run_sp::get_chunk_size (n_st);
RcppParallel::parallelReduce (0, n_st, oneAggregatePaired, chunk_size);

return Rcpp::wrap (oneAggregatePaired.output);
}



//' rcpp_flows_disperse_par
//'
Expand Down

0 comments on commit c008b30

Please sign in to comment.