diff --git a/NAMESPACE b/NAMESPACE index b8aec00..fa295b3 100644 --- a/NAMESPACE +++ b/NAMESPACE @@ -28,6 +28,7 @@ import(RcppProgress) import(purrr) importFrom(Rcpp,sourceCpp) importFrom(RcppParallel,RcppParallelLibs) +importFrom(RcppParallel,setThreadOptions) importFrom(ggplot2,aes) importFrom(ggplot2,coord_cartesian) importFrom(ggplot2,geom_point) diff --git a/R/mcmc.R b/R/mcmc.R index 20ed7ca..3c5fcb6 100644 --- a/R/mcmc.R +++ b/R/mcmc.R @@ -1,5 +1,7 @@ #' Sample from the target distribution using MCMC #' +#' @importFrom RcppParallel setThreadOptions +#' #' @export #' #' @param data Data to be used in MCMC, as generated by the `load_*_data` functions @@ -38,8 +40,8 @@ #' the latent genotypes at each step of the MCMC. WARNING: This will increase #' the size of the output object significantly. #' @param num_chains Total number of chains to run, possibly simultaneously -#' @param num_cores Total OMP parallel threads to use to run chains. -#' num_cores * pt_num_threads should not exceed the number of cores available +#' @param num_cores Total parallel cores to use to run algorithm. +#' num_cores * num_chains should not exceed the number of cores available #' on your system. #' @param pt_chains Total number of chains to run with parallel tempering or a #' vector containing the temperatures that should be used for parallel tempering. @@ -47,10 +49,6 @@ #' results in evenly distributed temperatures between \[0,1\], below 1 will bias #' towards 1 and above 1 will bias towards 0. Only used if pt_chains is a #' single value (i.e. not a vector). -#' @param pt_num_threads Total number of OMP parallel threads to be used to -#' process parallel tempered chains -#' num_cores * pt_num_threads should not exceed the number of cores available -#' on your system. #' @param adapt_temp Logical indicating whether or not to adapt the parallel #' tempering temperatures. If TRUE, the temperatures will be adapted during the #' `burnin` period, starting after `pre_adapt_steps` steps. The adaptation will @@ -87,11 +85,11 @@ run_mcmc <- num_cores = 1, pt_chains = 1, pt_grad = 1, - pt_num_threads = 1, adapt_temp = TRUE, pre_adapt_steps = 25, temp_adapt_steps = 25, max_initialization_tries = 10000) { + RcppParallel::setThreadOptions(numThreads = num_cores) start_time <- Sys.time() args <- as.list(environment()) mcmc_args <- as.list(environment()) diff --git a/data-raw/mcmc_results.R b/data-raw/mcmc_results.R index 5dacb24..7f79dce 100644 --- a/data-raw/mcmc_results.R +++ b/data-raw/mcmc_results.R @@ -27,12 +27,12 @@ simulated_data <- moire::simulate_data( burnin <- 1e3 num_samples <- 1e3 pt_chains <- seq(1, 0, length.out = 80) -pt_num_threads <- 20 # number of threads to use for parallel tempering +num_cores <- parallelly::availableCores() - 1 # number of threads to use for parallel processing mcmc_results <- moire::run_mcmc( simulated_data, verbose = TRUE, burnin = burnin, samples_per_chain = num_samples, - pt_chains = pt_chains, pt_num_threads = pt_num_threads, + pt_chains = pt_chains, num_cores = num_cores, adapt_temp = TRUE ) diff --git a/data/mcmc_results.rda b/data/mcmc_results.rda index 7121783..462d725 100644 Binary files a/data/mcmc_results.rda and b/data/mcmc_results.rda differ diff --git a/data/simulated_data.rda b/data/simulated_data.rda index f6fb384..846d5af 100644 Binary files a/data/simulated_data.rda and b/data/simulated_data.rda differ diff --git a/man/run_mcmc.Rd b/man/run_mcmc.Rd index 11bec87..8467971 100644 --- a/man/run_mcmc.Rd +++ b/man/run_mcmc.Rd @@ -29,7 +29,6 @@ run_mcmc( num_cores = 1, pt_chains = 1, pt_grad = 1, - pt_num_threads = 1, adapt_temp = TRUE, pre_adapt_steps = 25, temp_adapt_steps = 25, @@ -94,8 +93,8 @@ the size of the output object significantly.} \item{num_chains}{Total number of chains to run, possibly simultaneously} -\item{num_cores}{Total OMP parallel threads to use to run chains. -num_cores * pt_num_threads should not exceed the number of cores available +\item{num_cores}{Total parallel cores to use to run algorithm. +num_cores * num_chains should not exceed the number of cores available on your system.} \item{pt_chains}{Total number of chains to run with parallel tempering or a @@ -106,11 +105,6 @@ results in evenly distributed temperatures between [0,1], below 1 will bias towards 1 and above 1 will bias towards 0. Only used if pt_chains is a single value (i.e. not a vector).} -\item{pt_num_threads}{Total number of OMP parallel threads to be used to -process parallel tempered chains -num_cores * pt_num_threads should not exceed the number of cores available -on your system.} - \item{adapt_temp}{Logical indicating whether or not to adapt the parallel tempering temperatures. If TRUE, the temperatures will be adapted during the \code{burnin} period, starting after \code{pre_adapt_steps} steps. The adaptation will diff --git a/src/Makevars b/src/Makevars index dfffc7b..714ce9b 100644 --- a/src/Makevars +++ b/src/Makevars @@ -7,6 +7,4 @@ ifdef ENABLE_PROFILER PKG_CXXFLAGS+=-DENABLE_PROFILER endif -PKG_CXXFLAGS+=$(SHLIB_OPENMP_CXXFLAGS) -PKG_LIBS+=$(SHLIB_OPENMP_CXXFLAGS) PKG_LIBS+=$(shell ${R_HOME}/bin/Rscript -e "RcppParallel::RcppParallelLibs()") diff --git a/src/chain.cpp b/src/chain.cpp index 64b46f5..3dc120b 100644 --- a/src/chain.cpp +++ b/src/chain.cpp @@ -969,7 +969,6 @@ float Chain::calc_new_likelihood() genotyping_llik_new.end(), 0.0f); #else float sum = 0.0f; - #pragma omp simd reduction(+:sum) for (int i = 0; i < genotyping_llik_new.size(); ++i) { sum += genotyping_llik_new[i]; } @@ -991,19 +990,15 @@ float Chain::calc_new_prior() mean_coi_hyper_prior_new; #else float sum = 0.0f; - #pragma omp simd reduction(+:sum) for (int i = 0; i < eps_neg_prior_new.size(); ++i) { sum += eps_neg_prior_new[i]; } - #pragma omp simd reduction(+:sum) for (int i = 0; i < eps_pos_prior_new.size(); ++i) { sum += eps_pos_prior_new[i]; } - #pragma omp simd reduction(+:sum) for (int i = 0; i < relatedness_prior_new.size(); ++i) { sum += relatedness_prior_new[i]; } - #pragma omp simd reduction(+:sum) for (int i = 0; i < coi_prior_new.size(); ++i) { sum += coi_prior_new[i]; } diff --git a/src/mcmc.cpp b/src/mcmc.cpp index 3c46f87..aaf703a 100644 --- a/src/mcmc.cpp +++ b/src/mcmc.cpp @@ -5,18 +5,7 @@ #include -#ifdef _OPENMP -#include -#else -#define omp_get_num_threads() 1 -#define omp_get_thread_num() 0 -#define omp_get_max_threads() 1 -#define omp_get_thread_limit() 1 -#define omp_get_num_procs() 1 -#define omp_set_nested(a) -#define omp_set_num_threads(a) -#define omp_get_wtime() 0 -#endif +#include MCMC::MCMC(GenotypingData genotyping_data, Parameters params) : genotyping_data(genotyping_data), params(params) @@ -35,34 +24,33 @@ MCMC::MCMC(GenotypingData genotyping_data, Parameters params) swap_barriers.resize(params.pt_chains.size() - 1, 0.0); swap_indices.resize(params.pt_chains.size(), 0); - omp_set_num_threads(params.pt_num_threads); - - chains.resize(params.pt_chains.size()); std::vector any_ill_conditioned(params.pt_chains.size(), false); temp_gradient = params.pt_chains; - #pragma omp parallel for - for (size_t i = 0; i < params.pt_chains.size(); i++) + tbb::parallel_for(tbb::blocked_range(0, params.pt_chains.size()), [&](const tbb::blocked_range& range) { - if (std::any_of(any_ill_conditioned.begin(), any_ill_conditioned.end(), [](bool v) { return v; })) + for (size_t i = range.begin(); i < range.end(); ++i) { - continue; - } + if (std::any_of(any_ill_conditioned.begin(), any_ill_conditioned.end(), [](bool v) { return v; })) + { + continue; + } float temp = params.pt_chains[i]; chains[i] = Chain(genotyping_data, params, temp); bool ill_conditioned = std::isnan(chains[i].get_llik()); int max_tries = params.max_initialization_tries; - while (ill_conditioned and max_tries > 0) - { - chains[i].initialize_parameters(); - max_tries--; - ill_conditioned = std::isnan(chains[i].get_llik()); - } + while (ill_conditioned and max_tries > 0) + { + chains[i].initialize_parameters(); + max_tries--; + ill_conditioned = std::isnan(chains[i].get_llik()); + } - any_ill_conditioned[i] = ill_conditioned; - } + any_ill_conditioned[i] = ill_conditioned; + } + }); if (std::any_of(any_ill_conditioned.begin(), any_ill_conditioned.end(), [](bool v) { return v; })) { @@ -74,22 +62,24 @@ MCMC::MCMC(GenotypingData genotyping_data, Parameters params) void MCMC::burnin(int step) { -#pragma omp parallel for - for (auto &chain : chains) + tbb::parallel_for(tbb::blocked_range(0, chains.size()), [&](const tbb::blocked_range& range) { - chain.update_eps_neg(step); - chain.update_eps_pos(step); - chain.update_p(step); - chain.update_m(step); - if (params.allow_relatedness) + for (size_t i = range.begin(); i < range.end(); ++i) { - chain.update_r(step); - // chain.update_m_r(step); - chain.update_eff_coi(step); + auto& chain = chains[i]; + chain.update_eps_neg(step); + chain.update_eps_pos(step); + chain.update_p(step); + chain.update_m(step); + if (params.allow_relatedness) + { + chain.update_r(step); + chain.update_eff_coi(step); + } + chain.update_samples(step); + chain.update_mean_coi(step); } - chain.update_samples(step); - chain.update_mean_coi(step); - } + }); llik_burnin.push_back(get_llik()); prior_burnin.push_back(get_prior()); posterior_burnin.push_back(get_posterior()); @@ -97,8 +87,7 @@ void MCMC::burnin(int step) if (chains.size() > 1) { swap_chains(step, true); - if (num_swaps % params.temp_adapt_steps == 0 and - step > params.pre_adapt_steps and params.adapt_temp) + if (num_swaps % params.temp_adapt_steps == 0 and step > params.pre_adapt_steps and params.adapt_temp) { adapt_temp(); } @@ -223,47 +212,51 @@ void MCMC::adapt_temp() void MCMC::sample(int step) { -#pragma omp parallel for - for (auto &chain : chains) + tbb::parallel_for(tbb::blocked_range(0, chains.size()), [&](const tbb::blocked_range& range) { - chain.update_eps_neg(params.burnin + step); - chain.update_eps_pos(params.burnin + step); - chain.update_p(params.burnin + step); - chain.update_m(params.burnin + step); - - if (params.allow_relatedness) + for (size_t i = range.begin(); i < range.end(); ++i) { - chain.update_r(params.burnin + step); - chain.update_eff_coi(params.burnin + step); - } - chain.update_samples(params.burnin + step); - chain.update_mean_coi(params.burnin + step); + auto& chain = chains[i]; + chain.update_eps_neg(params.burnin + step); + chain.update_eps_pos(params.burnin + step); + chain.update_p(params.burnin + step); + chain.update_m(params.burnin + step); - if ((params.thin == 0 or step % params.thin == 0) and - (chain.get_hot() or chains.size() == 1)) - { - for (size_t ii = 0; ii < genotyping_data.num_loci; ++ii) + if (params.allow_relatedness) { - p_store[ii].push_back(chain.p[ii]); + chain.update_r(params.burnin + step); + chain.update_eff_coi(params.burnin + step); } + chain.update_samples(params.burnin + step); + chain.update_mean_coi(params.burnin + step); - for (size_t jj = 0; jj < genotyping_data.num_samples; ++jj) + if ((params.thin == 0 || step % params.thin == 0) && + (chain.get_hot() || chains.size() == 1)) { - m_store[jj].push_back(chain.m[jj]); - eps_neg_store[jj].push_back(chain.eps_neg[jj]); - eps_pos_store[jj].push_back(chain.eps_pos[jj]); - r_store[jj].push_back(chain.r[jj]); - - if (params.record_latent_genotypes) { - for (size_t kk = 0; kk < genotyping_data.num_loci; ++kk) - { - latent_genotypes_store[jj][kk].push_back(chain.latent_genotypes_new[kk][jj]); + for (size_t ii = 0; ii < genotyping_data.num_loci; ++ii) + { + p_store[ii].push_back(chain.p[ii]); + } + + for (size_t jj = 0; jj < genotyping_data.num_samples; ++jj) + { + m_store[jj].push_back(chain.m[jj]); + eps_neg_store[jj].push_back(chain.eps_neg[jj]); + eps_pos_store[jj].push_back(chain.eps_pos[jj]); + r_store[jj].push_back(chain.r[jj]); + + if (params.record_latent_genotypes) { + for (size_t kk = 0; kk < genotyping_data.num_loci; ++kk) + { + latent_genotypes_store[jj][kk].push_back(chain.latent_genotypes_new[kk][jj]); + } } } + mean_coi_store.push_back(chain.mean_coi); } - mean_coi_store.push_back(chain.mean_coi); } - } + }); + llik_sample.push_back(get_llik()); prior_sample.push_back(get_prior()); posterior_sample.push_back(get_posterior()); diff --git a/src/mcmc_utils.h b/src/mcmc_utils.h index 7150ea5..8c95f00 100644 --- a/src/mcmc_utils.h +++ b/src/mcmc_utils.h @@ -388,7 +388,6 @@ inline float logSumExp(const std::vector &x) } float sum = 0; -#pragma omp simd reduction(+ : sum) for (size_t i = 0; i < x.size(); ++i) { sum += std::exp(x[i] - max_el); diff --git a/src/parameters.cpp b/src/parameters.cpp index 3642adb..1be6e4a 100644 --- a/src/parameters.cpp +++ b/src/parameters.cpp @@ -15,7 +15,6 @@ Parameters::Parameters(const Rcpp::List &args) burnin = UtilFunctions::r_to_int(args["burnin"]); samples = UtilFunctions::r_to_int(args["samples"]); pt_chains = UtilFunctions::r_to_vector_float(args["pt_chains"]); - pt_num_threads = UtilFunctions::r_to_int(args["pt_num_threads"]); adapt_temp = UtilFunctions::r_to_bool(args["adapt_temp"]); pre_adapt_steps = UtilFunctions::r_to_int(args["pre_adapt_steps"]); temp_adapt_steps = UtilFunctions::r_to_int(args["temp_adapt_steps"]); diff --git a/src/parameters.h b/src/parameters.h index 9d0d90e..ef28068 100644 --- a/src/parameters.h +++ b/src/parameters.h @@ -17,7 +17,6 @@ class Parameters int burnin; int samples; std::vector pt_chains; - int pt_num_threads; bool adapt_temp; int pre_adapt_steps; int temp_adapt_steps; diff --git a/vignettes/mcmc_demo.Rmd b/vignettes/mcmc_demo.Rmd index 73d3c18..3799343 100644 --- a/vignettes/mcmc_demo.Rmd +++ b/vignettes/mcmc_demo.Rmd @@ -54,6 +54,7 @@ simulated_data <- moire::simulate_data( ``` ```{r load_simulated_data, eval=TRUE, include=FALSE} +# actual simulated data is contained in the package as part of the build process simulated_data <- moire::simulated_data ``` @@ -63,17 +64,19 @@ Now that we have our data, let's go ahead and run the MCMC. Here we run 20 paral burnin <- 1e3 num_samples <- 1e3 pt_chains <- seq(1, 0, length.out = 80) -pt_num_threads <- 20 # number of threads to use for parallel tempering +num_cores <- parallelly::availableCores() - 1 # number of threads to use for parallel processing mcmc_results <- moire::run_mcmc( simulated_data, verbose = TRUE, burnin = burnin, samples_per_chain = num_samples, - pt_chains = pt_chains, pt_num_threads = pt_num_threads, + pt_chains = pt_chains, num_cores = num_cores, adapt_temp = TRUE ) ``` ```{r run_mcmc_load, eval = TRUE, include = FALSE} +# actual MCMC results are contained in the package as part of the build process. Otherwise, +# building the vignette will take a long time. mcmc_results <- moire::mcmc_results ``` After running the MCMC, we can access the draws from the posterior distribution and analyze. diff --git a/vignettes/namibia.Rmd b/vignettes/namibia.Rmd index 4647577..52903a4 100644 --- a/vignettes/namibia.Rmd +++ b/vignettes/namibia.Rmd @@ -20,9 +20,8 @@ knitr::opts_chunk$set( The following code reproduces our analysis of data from Namibia as described in our paper. ```{r namibia, eval=FALSE, include=TRUE} -# Download Nambia data from here https://doi.org/10.7554/eLife.43510.018 -# and save it as namibia_data.xlsx in the working directory - +# Data is included as part of the package. More information about the data can be found here: +# `?namibia_data` full_dat <- moire::namibia_data epi_dat <- full_dat |> @@ -33,17 +32,17 @@ all_hfs <- epi_dat |> dplyr::pull(HealthFacility) |> unique() -verbose <- F +verbose <- FALSE allow_relatedness <- TRUE -burnin <- 5e2 -num_samples <- 1e2 +burnin <- 5e4 +num_samples <- 1e4 r_alpha <- 1 r_beta <- 1 eps_pos_alpha <- 1 eps_pos_beta <- 1 eps_neg_alpha <- 1 eps_neg_beta <- 1 -num_threads <- parallelly::availableCores() - 1 +num_cores <- parallelly::availableCores() - 1 for (hf in all_hfs) { hf_dat <- full_dat |> @@ -55,7 +54,7 @@ for (hf in all_hfs) { hf_dat, hf_dat$is_missing, allow_relatedness = allow_relatedness, burnin = burnin, samples_per_chain = num_samples, - pt_chains = 40, pt_num_threads = num_threads, thin = 10, + pt_chains = 40, num_cores = num_cores, thin = 10, verbose = verbose, adapt_temp = TRUE, r_alpha = r_alpha, r_beta = r_beta, eps_pos_alpha = eps_pos_alpha, eps_pos_beta = eps_pos_beta, eps_neg_alpha = eps_neg_alpha, eps_neg_beta = eps_neg_beta