Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
31 changes: 24 additions & 7 deletions include/core/NgenSimulation.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -5,13 +5,19 @@

#include <Simulation_Time.hpp>
#include <Layer.hpp>
#include <Bmi_Adapter.hpp>

namespace hy_features
{
class HY_Features;
class HY_Features_MPI;
}

namespace models::bmi
{
class Bmi_Py_Adapter;
}

#include <memory>
#include <vector>
#include <unordered_map>
Expand All @@ -21,9 +27,16 @@ namespace hy_features
class NgenSimulation
{
public:
#if NGEN_WITH_MPI
using hy_features_t = hy_features::HY_Features_MPI;
#else
using hy_features_t = hy_features::HY_Features;
#endif

NgenSimulation(
Simulation_Time const& sim_time,
std::vector<std::shared_ptr<ngen::Layer>> layers,
hy_features_t *features,
std::unordered_map<std::string, int> catchment_indexes,
std::unordered_map<std::string, int> nexus_indexes,
int mpi_rank,
Expand All @@ -33,12 +46,6 @@ class NgenSimulation

~NgenSimulation();

#if NGEN_WITH_MPI
using hy_features_t = hy_features::HY_Features_MPI;
#else
using hy_features_t = hy_features::HY_Features;
#endif

/**
* Run the catchment formulations for the full configured duration of the simulation
*
Expand All @@ -48,10 +55,11 @@ class NgenSimulation
*/
void run_catchments();

void initialize_routing(std::string const& t_route_config_file_with_path);
/**
* Run t-route on the stored nexus outflow values for the full configured duration of the simulation
*/
void run_routing(hy_features_t &features, std::string const& t_route_config_file_with_path);
void run_routing();

int get_nexus_index(std::string const& nexus_id) const;
double get_nexus_outflow(int nexus_index, int timestep_index) const;
Expand All @@ -62,19 +70,28 @@ class NgenSimulation
private:
void advance_models_one_output_step();

void gather_flows_for_routing(size_t num_steps, boost::span<double> local_downflows, boost::span<double> gathered_downflows);
void advance_routing_one_step(boost::span<double> downflows);

int simulation_step_;

std::shared_ptr<Simulation_Time> sim_time_;

// Catchment data
std::vector<std::shared_ptr<ngen::Layer>> layers_;
hy_features_t *features_;

// Routing data structured for t-route
std::unordered_map<std::string, int> catchment_indexes_;
std::vector<double> catchment_outflows_;
std::unordered_map<std::string, int> nexus_indexes_;
std::vector<double> nexus_downstream_flows_;

std::unique_ptr<models::bmi::Bmi_Adapter> py_troute_ = nullptr;
size_t global_nexus_count_;
std::unordered_map<std::string, int> global_nexus_indexes_;
std::unordered_map<std::string, int> *routing_nexus_indexes_;

int mpi_rank_;
int mpi_num_procs_;

Expand Down
7 changes: 6 additions & 1 deletion src/NGen.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -708,11 +708,16 @@ int main(int argc, char* argv[]) {

auto simulation = std::make_unique<NgenSimulation>(*sim_time,
layers,
&features,
std::move(catchment_indexes),
std::move(nexus_indexes),
mpi_rank,
mpi_num_procs);

if (manager->get_using_routing()) {
simulation->initialize_routing(manager->get_t_route_config_file_with_path());
}

auto time_done_init = std::chrono::steady_clock::now();
std::chrono::duration<double> time_elapsed_init = time_done_init - time_start;
LOG("[TIMING]: Init: " + std::to_string(time_elapsed_init.count()), LogLevel::INFO);
Expand Down Expand Up @@ -761,7 +766,7 @@ int main(int argc, char* argv[]) {
#endif

if (manager->get_using_routing()) {
simulation->run_routing(features, manager->get_t_route_config_file_with_path());
simulation->run_routing();
}

auto time_done_routing = std::chrono::steady_clock::now();
Expand Down
183 changes: 117 additions & 66 deletions src/core/NgenSimulation.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
NgenSimulation::NgenSimulation(
Simulation_Time const& sim_time,
std::vector<std::shared_ptr<ngen::Layer>> layers,
hy_features_t *features,
std::unordered_map<std::string, int> catchment_indexes,
std::unordered_map<std::string, int> nexus_indexes,
int mpi_rank,
Expand All @@ -25,8 +26,10 @@ NgenSimulation::NgenSimulation(
: simulation_step_(0)
, sim_time_(std::make_shared<Simulation_Time>(sim_time))
, layers_(std::move(layers))
, features_(features)
, catchment_indexes_(std::move(catchment_indexes))
, nexus_indexes_(std::move(nexus_indexes))
, routing_nexus_indexes_(&nexus_indexes)
, mpi_rank_(mpi_rank)
, mpi_num_procs_(mpi_num_procs)
{
Expand All @@ -41,13 +44,39 @@ void NgenSimulation::run_catchments()
// Now loop some time, iterate catchments, do stuff for total number of output times
auto num_times = get_num_output_times();

#if NGEN_WITH_MPI
std::vector<double> all_nexus_downflows;

if (mpi_num_procs_ > 1) {
if (mpi_rank_ == 0) {
all_nexus_downflows.resize(global_nexus_count_);
}
}
#endif // NGEN_WITH_MPI

for (; simulation_step_ < num_times; simulation_step_++) {
// Make room for this output step's results
catchment_outflows_.resize(catchment_outflows_.size() + catchment_indexes_.size(), 0.0);
nexus_downstream_flows_.resize(nexus_downstream_flows_.size() + nexus_indexes_.size(), 0.0);

advance_models_one_output_step();

#if NGEN_WITH_ROUTING
boost::span<double> routing_nexus_downflows = {nexus_downstream_flows_.data() + nexus_indexes_.size() * simulation_step_,
nexus_indexes_.size()};

#if NGEN_WITH_MPI
if (mpi_num_procs_ > 1) {
gather_flows_for_routing(1, routing_nexus_downflows, all_nexus_downflows);
routing_nexus_downflows = all_nexus_downflows;
}
#endif // NGEN_WITH_MPI

if (mpi_rank_ == 0) { // Run t-route from single process
advance_routing_one_step(routing_nexus_downflows);
}
#endif // NGEN_WITH_ROUTING

if (simulation_step_ + 1 < num_times) {
sim_time_->advance_timestep();
}
Expand Down Expand Up @@ -119,23 +148,13 @@ double NgenSimulation::get_nexus_outflow(int nexus_index, int timestep_index) co
return nexus_downstream_flows_[timestep_index * nexus_indexes_.size() + nexus_index];
}

void NgenSimulation::run_routing(NgenSimulation::hy_features_t &features, std::string const& t_route_config_file_with_path)
void NgenSimulation::initialize_routing(std::string const& t_route_config_file_with_path)
{
#if NGEN_WITH_ROUTING
std::vector<double> *routing_nexus_downflows = &nexus_downstream_flows_;
std::unordered_map<std::string, int> *routing_nexus_indexes = &nexus_indexes_;

size_t number_of_timesteps = sim_time_->get_total_output_times();
if (nexus_downstream_flows_.size() != number_of_timesteps * nexus_indexes_.size()) {
std::string msg = "Routing input data in NgenSimulation::nexus_downstream_flows_ does not reflect a full-duration run";
LOG(msg, LogLevel::FATAL);
throw std::runtime_error(msg);
}
global_nexus_count_ = nexus_indexes_.size();

#if NGEN_WITH_MPI
std::vector<double> all_nexus_downflows;
std::unordered_map<std::string, int> all_nexus_indexes;

if (mpi_num_procs_ > 1) {
std::vector<std::string> local_nexus_ids;
for (const auto& nexus : nexus_indexes_) {
Expand All @@ -154,64 +173,45 @@ void NgenSimulation::run_routing(NgenSimulation::hy_features_t &features, std::s
// MPI_Broadcast so all processes share the nexus IDs
all_nexus_ids = std::move(parallel::broadcast_strings(all_nexus_ids, mpi_rank_, mpi_num_procs_));

// MPI_Reduce to collect the results from processes
if (mpi_rank_ == 0) {
all_nexus_downflows.resize(number_of_timesteps * all_nexus_ids.size(), 0.0);
}
std::vector<double> local_buffer(number_of_timesteps);
std::vector<double> receive_buffer(number_of_timesteps, 0.0);
for (int i = 0; i < all_nexus_ids.size(); ++i) {
std::string nexus_id = all_nexus_ids[i];
if (nexus_indexes_.find(nexus_id) != nexus_indexes_.end() && !features.is_remote_sender_nexus(nexus_id)) {
// if this process has the id and receives/records data, copy the values to the buffer
int nexus_index = nexus_indexes_[nexus_id];
for (int step = 0; step < number_of_timesteps; ++step) {
int offset = step * nexus_indexes_.size() + nexus_index;
local_buffer[step] = nexus_downstream_flows_[offset];
}
} else {
// if this process does not have the id, fill with 0 to make sure it doesn't affect reduce sum
std::fill(local_buffer.begin(), local_buffer.end(), 0.0);
}
MPI_Reduce(local_buffer.data(), receive_buffer.data(), number_of_timesteps, MPI_DOUBLE, MPI_SUM, 0, MPI_COMM_WORLD);
if (mpi_rank_ == 0) {
// copy reduce values to a combined downflows vector
all_nexus_indexes[nexus_id] = i;
for (int step = 0; step < number_of_timesteps; ++step) {
int offset = step * all_nexus_ids.size() + i;
all_nexus_downflows[offset] = receive_buffer[step];
receive_buffer[step] = 0.0;
}
}
std::string const& nexus_id = all_nexus_ids[i];
global_nexus_indexes_[nexus_id] = i;
}

if (mpi_rank_ == 0) {
// update root's local data for running t-route below
routing_nexus_indexes = &all_nexus_indexes;
routing_nexus_downflows = &all_nexus_downflows;
}
routing_nexus_indexes_ = &global_nexus_indexes_;
global_nexus_count_ = all_nexus_ids.size();
}
#endif // NGEN_WITH_MPI

if (mpi_rank_ == 0) { // Run t-route from single process
LOG(LogLevel::INFO, "Running T-Route on nexus outflows.");
if (mpi_rank_ == 0) {
py_troute_ = std::make_unique<models::bmi::Bmi_Py_Adapter>("T-Route", t_route_config_file_with_path, "troute_nwm_bmi.troute_bmi.BmiTroute", true);

// Note: Currently, delta_time is set in the t-route yaml configuration file, and the
// number_of_timesteps is determined from the total number of nexus outputs in t-route.
// It is recommended to still pass these values to the routing_py_adapter object in
// case a future implementation needs these two values from the ngen framework.
int delta_time = sim_time_->get_output_interval_seconds();
// Error/inconsistency checking
{
std::string time_units = py_troute_->GetTimeUnits();
if (time_units != "s") {
Logger::logMsgAndThrowError("T-route units were expected to be 's', got " + time_units);
}

// model for routing
models::bmi::Bmi_Py_Adapter py_troute("T-Route", t_route_config_file_with_path, "troute_nwm_bmi.troute_bmi.BmiTroute", true);
// Note: Currently, delta_time is set in the t-route yaml configuration file, and the
// number_of_timesteps is determined from the total number of nexus outputs in t-route.
// It is recommended to still pass these values to the routing_py_adapter object in
// case a future implementation needs these two values from the ngen framework.
int delta_time = sim_time_->get_output_interval_seconds();
double bmi_timestep = py_troute_->GetTimeStep();
if (bmi_timestep != delta_time) {
Logger::logMsgAndThrowError("T-route timestep " + std::to_string(bmi_timestep) + " s"
+ "doesn't match simulation timestep " + std::to_string(delta_time) + " s ");
}
}

// tell BMI to resize nexus containers
int64_t nexus_count = routing_nexus_indexes->size();
py_troute.SetValue("land_surface_water_source__volume_flow_rate__count", &nexus_count);
py_troute.SetValue("land_surface_water_source__id__count", &nexus_count);
py_troute_->SetValue("land_surface_water_source__volume_flow_rate__count", &global_nexus_count_);
py_troute_->SetValue("land_surface_water_source__id__count", &global_nexus_count_);

// set up nexus id indexes
std::vector<int> nexus_df_index(nexus_count);
for (const auto& key_value : *routing_nexus_indexes) {
std::vector<int> nexus_df_index(global_nexus_count_);
for (const auto& key_value : *routing_nexus_indexes_) {
int id_index = key_value.second;

// Convert string ID into numbers for T-route index
Expand All @@ -228,14 +228,65 @@ void NgenSimulation::run_routing(NgenSimulation::hy_features_t &features, std::s
}
nexus_df_index[id_index] = id_as_int;
}
py_troute.SetValue("land_surface_water_source__id", nexus_df_index.data());
for (int i = 0; i < number_of_timesteps; ++i) {
py_troute.SetValue("land_surface_water_source__volume_flow_rate",
routing_nexus_downflows->data() + (i * nexus_count));
py_troute.Update();
py_troute_->SetValue("land_surface_water_source__id", nexus_df_index.data());
}
#endif // NGEN_WITH_ROUTING
}

// Only called in the mpi_num_procs_ > 1 case
void NgenSimulation::gather_flows_for_routing(size_t num_steps,
boost::span<double> local_downflows,
boost::span<double> gathered_downflows)
{
#if NGEN_WITH_MPI
std::vector<double> local_buffer(num_steps);
std::vector<double> receive_buffer(num_steps, 0.0);

for (auto const& global_nexus_iter : global_nexus_indexes_) {
std::string const& nexus_id = global_nexus_iter.first;
int global_nexus_index = global_nexus_iter.second;

auto local_iter = nexus_indexes_.find(nexus_id);
if (local_iter != nexus_indexes_.end() && !features_->is_remote_sender_nexus(nexus_id)) {
// if this process has the id and receives/records data, copy the values to the buffer
int nexus_index = local_iter->second;
for (int step = 0; step < num_steps; ++step) {
int offset = step * nexus_indexes_.size() + nexus_index;
local_buffer[step] = local_downflows[offset];
}
} else {
// if this process does not have the id, fill with 0 to make sure it doesn't affect reduce sum
std::fill(local_buffer.begin(), local_buffer.end(), 0.0);
}

MPI_Reduce(local_buffer.data(), receive_buffer.data(), num_steps, MPI_DOUBLE, MPI_SUM, 0, MPI_COMM_WORLD);

if (mpi_rank_ == 0) {
// copy reduce values to a combined downflows vector
for (int step = 0; step < num_steps; ++step) {
int offset = step * global_nexus_count_ + global_nexus_index;
gathered_downflows[offset] = receive_buffer[step];
receive_buffer[step] = 0.0;
}
}
// Finalize will write the output file
py_troute.Finalize();
}
#endif
}

void NgenSimulation::advance_routing_one_step(boost::span<double> downflows)
{
py_troute_->SetValue("land_surface_water_source__volume_flow_rate", downflows.data());
py_troute_->Update();
}

void NgenSimulation::run_routing()
{
#if NGEN_WITH_ROUTING
if (mpi_rank_ == 0) { // Run t-route from single process
LOG(LogLevel::INFO, "Running T-Route on nexus outflows.");

// Finalize will finish running the simulation and write the output file
py_troute_->Finalize();
}
#endif // NGEN_WITH_ROUTING
}
Expand Down