diff --git a/include/core/NgenSimulation.hpp b/include/core/NgenSimulation.hpp index 00e5ef49eb..c9661d92ce 100644 --- a/include/core/NgenSimulation.hpp +++ b/include/core/NgenSimulation.hpp @@ -5,6 +5,7 @@ #include #include +#include namespace hy_features { @@ -12,6 +13,11 @@ namespace hy_features class HY_Features_MPI; } +namespace models::bmi +{ + class Bmi_Py_Adapter; +} + #include #include #include @@ -21,9 +27,16 @@ namespace hy_features class NgenSimulation { public: +#if NGEN_WITH_MPI + using hy_features_t = hy_features::HY_Features_MPI; +#else + using hy_features_t = hy_features::HY_Features; +#endif + NgenSimulation( Simulation_Time const& sim_time, std::vector> layers, + hy_features_t *features, std::unordered_map catchment_indexes, std::unordered_map nexus_indexes, int mpi_rank, @@ -33,12 +46,6 @@ class NgenSimulation ~NgenSimulation(); -#if NGEN_WITH_MPI - using hy_features_t = hy_features::HY_Features_MPI; -#else - using hy_features_t = hy_features::HY_Features; -#endif - /** * Run the catchment formulations for the full configured duration of the simulation * @@ -48,10 +55,11 @@ class NgenSimulation */ void run_catchments(); + void initialize_routing(std::string const& t_route_config_file_with_path); /** * Run t-route on the stored nexus outflow values for the full configured duration of the simulation */ - void run_routing(hy_features_t &features, std::string const& t_route_config_file_with_path); + void run_routing(); int get_nexus_index(std::string const& nexus_id) const; double get_nexus_outflow(int nexus_index, int timestep_index) const; @@ -62,12 +70,16 @@ class NgenSimulation private: void advance_models_one_output_step(); + void gather_flows_for_routing(size_t num_steps, boost::span local_downflows, boost::span gathered_downflows); + void advance_routing_one_step(boost::span downflows); + int simulation_step_; std::shared_ptr sim_time_; // Catchment data std::vector> layers_; + hy_features_t *features_; // Routing data structured for t-route std::unordered_map catchment_indexes_; @@ -75,6 +87,11 @@ class NgenSimulation std::unordered_map nexus_indexes_; std::vector nexus_downstream_flows_; + std::unique_ptr py_troute_ = nullptr; + size_t global_nexus_count_; + std::unordered_map global_nexus_indexes_; + std::unordered_map *routing_nexus_indexes_; + int mpi_rank_; int mpi_num_procs_; diff --git a/src/NGen.cpp b/src/NGen.cpp index c554ef751a..899a030521 100644 --- a/src/NGen.cpp +++ b/src/NGen.cpp @@ -708,11 +708,16 @@ int main(int argc, char* argv[]) { auto simulation = std::make_unique(*sim_time, layers, + &features, std::move(catchment_indexes), std::move(nexus_indexes), mpi_rank, mpi_num_procs); + if (manager->get_using_routing()) { + simulation->initialize_routing(manager->get_t_route_config_file_with_path()); + } + auto time_done_init = std::chrono::steady_clock::now(); std::chrono::duration time_elapsed_init = time_done_init - time_start; LOG("[TIMING]: Init: " + std::to_string(time_elapsed_init.count()), LogLevel::INFO); @@ -761,7 +766,7 @@ int main(int argc, char* argv[]) { #endif if (manager->get_using_routing()) { - simulation->run_routing(features, manager->get_t_route_config_file_with_path()); + simulation->run_routing(); } auto time_done_routing = std::chrono::steady_clock::now(); diff --git a/src/core/NgenSimulation.cpp b/src/core/NgenSimulation.cpp index c5cce69103..f4cae4f8cb 100644 --- a/src/core/NgenSimulation.cpp +++ b/src/core/NgenSimulation.cpp @@ -17,6 +17,7 @@ NgenSimulation::NgenSimulation( Simulation_Time const& sim_time, std::vector> layers, + hy_features_t *features, std::unordered_map catchment_indexes, std::unordered_map nexus_indexes, int mpi_rank, @@ -25,8 +26,10 @@ NgenSimulation::NgenSimulation( : simulation_step_(0) , sim_time_(std::make_shared(sim_time)) , layers_(std::move(layers)) + , features_(features) , catchment_indexes_(std::move(catchment_indexes)) , nexus_indexes_(std::move(nexus_indexes)) + , routing_nexus_indexes_(&nexus_indexes) , mpi_rank_(mpi_rank) , mpi_num_procs_(mpi_num_procs) { @@ -41,6 +44,16 @@ void NgenSimulation::run_catchments() // Now loop some time, iterate catchments, do stuff for total number of output times auto num_times = get_num_output_times(); +#if NGEN_WITH_MPI + std::vector all_nexus_downflows; + + if (mpi_num_procs_ > 1) { + if (mpi_rank_ == 0) { + all_nexus_downflows.resize(global_nexus_count_); + } + } +#endif // NGEN_WITH_MPI + for (; simulation_step_ < num_times; simulation_step_++) { // Make room for this output step's results catchment_outflows_.resize(catchment_outflows_.size() + catchment_indexes_.size(), 0.0); @@ -48,6 +61,22 @@ void NgenSimulation::run_catchments() advance_models_one_output_step(); +#if NGEN_WITH_ROUTING + boost::span routing_nexus_downflows = {nexus_downstream_flows_.data() + nexus_indexes_.size() * simulation_step_, + nexus_indexes_.size()}; + +#if NGEN_WITH_MPI + if (mpi_num_procs_ > 1) { + gather_flows_for_routing(1, routing_nexus_downflows, all_nexus_downflows); + routing_nexus_downflows = all_nexus_downflows; + } +#endif // NGEN_WITH_MPI + + if (mpi_rank_ == 0) { // Run t-route from single process + advance_routing_one_step(routing_nexus_downflows); + } +#endif // NGEN_WITH_ROUTING + if (simulation_step_ + 1 < num_times) { sim_time_->advance_timestep(); } @@ -119,23 +148,13 @@ double NgenSimulation::get_nexus_outflow(int nexus_index, int timestep_index) co return nexus_downstream_flows_[timestep_index * nexus_indexes_.size() + nexus_index]; } -void NgenSimulation::run_routing(NgenSimulation::hy_features_t &features, std::string const& t_route_config_file_with_path) +void NgenSimulation::initialize_routing(std::string const& t_route_config_file_with_path) { #if NGEN_WITH_ROUTING - std::vector *routing_nexus_downflows = &nexus_downstream_flows_; std::unordered_map *routing_nexus_indexes = &nexus_indexes_; - - size_t number_of_timesteps = sim_time_->get_total_output_times(); - if (nexus_downstream_flows_.size() != number_of_timesteps * nexus_indexes_.size()) { - std::string msg = "Routing input data in NgenSimulation::nexus_downstream_flows_ does not reflect a full-duration run"; - LOG(msg, LogLevel::FATAL); - throw std::runtime_error(msg); - } + global_nexus_count_ = nexus_indexes_.size(); #if NGEN_WITH_MPI - std::vector all_nexus_downflows; - std::unordered_map all_nexus_indexes; - if (mpi_num_procs_ > 1) { std::vector local_nexus_ids; for (const auto& nexus : nexus_indexes_) { @@ -154,64 +173,45 @@ void NgenSimulation::run_routing(NgenSimulation::hy_features_t &features, std::s // MPI_Broadcast so all processes share the nexus IDs all_nexus_ids = std::move(parallel::broadcast_strings(all_nexus_ids, mpi_rank_, mpi_num_procs_)); - // MPI_Reduce to collect the results from processes - if (mpi_rank_ == 0) { - all_nexus_downflows.resize(number_of_timesteps * all_nexus_ids.size(), 0.0); - } - std::vector local_buffer(number_of_timesteps); - std::vector receive_buffer(number_of_timesteps, 0.0); for (int i = 0; i < all_nexus_ids.size(); ++i) { - std::string nexus_id = all_nexus_ids[i]; - if (nexus_indexes_.find(nexus_id) != nexus_indexes_.end() && !features.is_remote_sender_nexus(nexus_id)) { - // if this process has the id and receives/records data, copy the values to the buffer - int nexus_index = nexus_indexes_[nexus_id]; - for (int step = 0; step < number_of_timesteps; ++step) { - int offset = step * nexus_indexes_.size() + nexus_index; - local_buffer[step] = nexus_downstream_flows_[offset]; - } - } else { - // if this process does not have the id, fill with 0 to make sure it doesn't affect reduce sum - std::fill(local_buffer.begin(), local_buffer.end(), 0.0); - } - MPI_Reduce(local_buffer.data(), receive_buffer.data(), number_of_timesteps, MPI_DOUBLE, MPI_SUM, 0, MPI_COMM_WORLD); - if (mpi_rank_ == 0) { - // copy reduce values to a combined downflows vector - all_nexus_indexes[nexus_id] = i; - for (int step = 0; step < number_of_timesteps; ++step) { - int offset = step * all_nexus_ids.size() + i; - all_nexus_downflows[offset] = receive_buffer[step]; - receive_buffer[step] = 0.0; - } - } + std::string const& nexus_id = all_nexus_ids[i]; + global_nexus_indexes_[nexus_id] = i; } - if (mpi_rank_ == 0) { - // update root's local data for running t-route below - routing_nexus_indexes = &all_nexus_indexes; - routing_nexus_downflows = &all_nexus_downflows; - } + routing_nexus_indexes_ = &global_nexus_indexes_; + global_nexus_count_ = all_nexus_ids.size(); } #endif // NGEN_WITH_MPI - if (mpi_rank_ == 0) { // Run t-route from single process - LOG(LogLevel::INFO, "Running T-Route on nexus outflows."); + if (mpi_rank_ == 0) { + py_troute_ = std::make_unique("T-Route", t_route_config_file_with_path, "troute_nwm_bmi.troute_bmi.BmiTroute", true); - // Note: Currently, delta_time is set in the t-route yaml configuration file, and the - // number_of_timesteps is determined from the total number of nexus outputs in t-route. - // It is recommended to still pass these values to the routing_py_adapter object in - // case a future implementation needs these two values from the ngen framework. - int delta_time = sim_time_->get_output_interval_seconds(); + // Error/inconsistency checking + { + std::string time_units = py_troute_->GetTimeUnits(); + if (time_units != "s") { + Logger::logMsgAndThrowError("T-route units were expected to be 's', got " + time_units); + } - // model for routing - models::bmi::Bmi_Py_Adapter py_troute("T-Route", t_route_config_file_with_path, "troute_nwm_bmi.troute_bmi.BmiTroute", true); + // Note: Currently, delta_time is set in the t-route yaml configuration file, and the + // number_of_timesteps is determined from the total number of nexus outputs in t-route. + // It is recommended to still pass these values to the routing_py_adapter object in + // case a future implementation needs these two values from the ngen framework. + int delta_time = sim_time_->get_output_interval_seconds(); + double bmi_timestep = py_troute_->GetTimeStep(); + if (bmi_timestep != delta_time) { + Logger::logMsgAndThrowError("T-route timestep " + std::to_string(bmi_timestep) + " s" + + "doesn't match simulation timestep " + std::to_string(delta_time) + " s "); + } + } // tell BMI to resize nexus containers - int64_t nexus_count = routing_nexus_indexes->size(); - py_troute.SetValue("land_surface_water_source__volume_flow_rate__count", &nexus_count); - py_troute.SetValue("land_surface_water_source__id__count", &nexus_count); + py_troute_->SetValue("land_surface_water_source__volume_flow_rate__count", &global_nexus_count_); + py_troute_->SetValue("land_surface_water_source__id__count", &global_nexus_count_); + // set up nexus id indexes - std::vector nexus_df_index(nexus_count); - for (const auto& key_value : *routing_nexus_indexes) { + std::vector nexus_df_index(global_nexus_count_); + for (const auto& key_value : *routing_nexus_indexes_) { int id_index = key_value.second; // Convert string ID into numbers for T-route index @@ -228,14 +228,65 @@ void NgenSimulation::run_routing(NgenSimulation::hy_features_t &features, std::s } nexus_df_index[id_index] = id_as_int; } - py_troute.SetValue("land_surface_water_source__id", nexus_df_index.data()); - for (int i = 0; i < number_of_timesteps; ++i) { - py_troute.SetValue("land_surface_water_source__volume_flow_rate", - routing_nexus_downflows->data() + (i * nexus_count)); - py_troute.Update(); + py_troute_->SetValue("land_surface_water_source__id", nexus_df_index.data()); + } +#endif // NGEN_WITH_ROUTING +} + +// Only called in the mpi_num_procs_ > 1 case +void NgenSimulation::gather_flows_for_routing(size_t num_steps, + boost::span local_downflows, + boost::span gathered_downflows) +{ +#if NGEN_WITH_MPI + std::vector local_buffer(num_steps); + std::vector receive_buffer(num_steps, 0.0); + + for (auto const& global_nexus_iter : global_nexus_indexes_) { + std::string const& nexus_id = global_nexus_iter.first; + int global_nexus_index = global_nexus_iter.second; + + auto local_iter = nexus_indexes_.find(nexus_id); + if (local_iter != nexus_indexes_.end() && !features_->is_remote_sender_nexus(nexus_id)) { + // if this process has the id and receives/records data, copy the values to the buffer + int nexus_index = local_iter->second; + for (int step = 0; step < num_steps; ++step) { + int offset = step * nexus_indexes_.size() + nexus_index; + local_buffer[step] = local_downflows[offset]; + } + } else { + // if this process does not have the id, fill with 0 to make sure it doesn't affect reduce sum + std::fill(local_buffer.begin(), local_buffer.end(), 0.0); + } + + MPI_Reduce(local_buffer.data(), receive_buffer.data(), num_steps, MPI_DOUBLE, MPI_SUM, 0, MPI_COMM_WORLD); + + if (mpi_rank_ == 0) { + // copy reduce values to a combined downflows vector + for (int step = 0; step < num_steps; ++step) { + int offset = step * global_nexus_count_ + global_nexus_index; + gathered_downflows[offset] = receive_buffer[step]; + receive_buffer[step] = 0.0; + } } - // Finalize will write the output file - py_troute.Finalize(); + } +#endif +} + +void NgenSimulation::advance_routing_one_step(boost::span downflows) +{ + py_troute_->SetValue("land_surface_water_source__volume_flow_rate", downflows.data()); + py_troute_->Update(); +} + +void NgenSimulation::run_routing() +{ +#if NGEN_WITH_ROUTING + if (mpi_rank_ == 0) { // Run t-route from single process + LOG(LogLevel::INFO, "Running T-Route on nexus outflows."); + + // Finalize will finish running the simulation and write the output file + py_troute_->Finalize(); } #endif // NGEN_WITH_ROUTING }