diff --git a/CMakeLists.txt b/CMakeLists.txt index ba15da154..955836b76 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -220,18 +220,13 @@ include(cmake/AddJanaTest.cmake) add_subdirectory(src/external) add_subdirectory(src/libraries/JANA) add_subdirectory(src/plugins) -add_subdirectory(src/programs/jana) +add_subdirectory(src/programs) add_subdirectory(src/python) if (${BUILD_EXAMPLES}) add_subdirectory(src/examples) endif() -if (${BUILD_TESTS}) - add_subdirectory(src/programs/unit_tests) - add_subdirectory(src/programs/perf_tests) -endif() - #--------------------------------------------------------------------------------------- diff --git a/docs/offloading.md b/docs/offloading.md new file mode 100644 index 000000000..9c9fe6098 --- /dev/null +++ b/docs/offloading.md @@ -0,0 +1,72 @@ + + + + +Work package: GPU arrows +------------------------ + +Problem: Arrows can only have _one_ next_input at any given time +Solution: +- GPU tasks can all be pushed by GPUArrow to _one_ queue per physical GPU + +Design: +- GPUMapArrow + - Knows which factory needs the GPU + - Knows which factories need to run upstream + - Knows how to pack and unpack the offloaded data + +- GPUTapArrow + - Knows what to do with the packed input data. How does it know this? + - Doesn't worry about pipelining the memory movement for now + +- GPUFactory + - I'm hoping we can reuse Inputs and Outputs for pack() and unpack(), because + they need to be in the JEvent _somehow_ + + +Problem: GPUTapArrow doesn't know what to do! Needs some kind of indication of + - What factory needs to be run on this thing? + - What arrow (what queue, technically) does the JEvent need to be returned to? + + One of the reasons this is a problem is because JEvent databundles are supposed to + be immutable, so this cannot be a factory output. Probably has to be a separate + field on the JEvent. Who sets this field, though? And via what interface? + + Note that the GPUMapArrow knows the factory and output queue (which MIGHT just be + its own input queue. Is this always the case?). Maybe GPUMapArrow should set this. + + So what should this be? (databundle_name, next_input_queue). + Quickly check whether next_input_queue is a arrow-local port index or a topology-wide queue id. + - Sadly it is an arrow-local port index + - The queue doesn't know it's own id + - So it's the responsibility of the TopologyBuilder to inform the GPUTapArrow what the output queue index is + - OR we only store the factoryname, and the GPUTapArrow looks up the correct port index for the given factoryname + +Problem: +I'm not sure if the concept of a continuation name is general enough to stick onto JEvent. Let's +think about the calibration workflow a little more. That might require re-running factories that have already run, +which poses a problem for immutability, but which we might be able to solve via event levels. + + +Suppose I unroll the loop and avoid having continuations. What a relief! Now each GPUTapArrow knows exactly what to do. +However, now I need to do a lot more in my JTopologyBuilder (to figure out this unroll) and my scheduler needs to enforce +mutual exclusion _across_ arrows. Both of these are very unpleasant to implement. I might need them eventually. But not today. + + +Resuming this! +Goal: Check whether I call Preprocess on a JFactory + + + + + + + + + + + + + + + diff --git a/src/examples/misc/SubeventCUDAExample/SubeventCUDAExample.cu b/src/examples/misc/SubeventCUDAExample/SubeventCUDAExample.cu index cc2b2cd2b..e9fd31e57 100644 --- a/src/examples/misc/SubeventCUDAExample/SubeventCUDAExample.cu +++ b/src/examples/misc/SubeventCUDAExample/SubeventCUDAExample.cu @@ -144,7 +144,7 @@ int main() { source_arrow->set_input(topology->event_pool); source_arrow->set_output(&events_in); - auto proc_arrow = new JEventMapArrow("simpleProcessor"); + auto proc_arrow = new JMapArrow("simpleProcessor"); proc_arrow->set_input(&events_out); proc_arrow->set_output(topology->event_pool); proc_arrow->add_processor(new SimpleProcessor); diff --git a/src/examples/misc/SubeventExample/SubeventExample.cc b/src/examples/misc/SubeventExample/SubeventExample.cc index d99feccb9..a6a9fedd9 100644 --- a/src/examples/misc/SubeventExample/SubeventExample.cc +++ b/src/examples/misc/SubeventExample/SubeventExample.cc @@ -7,8 +7,8 @@ #include #include -#include -#include +#include +#include #include #include @@ -111,7 +111,7 @@ int main() { source_arrow->attach(topology->event_pool, 0); source_arrow->attach(&events_in, 1); - auto proc_arrow = new JEventMapArrow("simpleProcessor"); + auto proc_arrow = new JMapArrow("simpleProcessor"); proc_arrow->attach(&events_out, 0); proc_arrow->attach(topology->event_pool, 1); proc_arrow->add_processor(new SimpleProcessor); diff --git a/src/libraries/JANA/CMakeLists.txt b/src/libraries/JANA/CMakeLists.txt index 42da51242..27a03b9a5 100644 --- a/src/libraries/JANA/CMakeLists.txt +++ b/src/libraries/JANA/CMakeLists.txt @@ -17,9 +17,9 @@ set(JANA2_SOURCES Topology/JArrow.cc Topology/JEventPool.cc - Topology/JEventSourceArrow.cc - Topology/JEventMapArrow.cc - Topology/JEventTapArrow.cc + Topology/JSourceArrow.cc + Topology/JMapArrow.cc + Topology/JTapArrow.cc Topology/JMultilevelSourceArrow.cc Topology/JTopologyBuilder.cc diff --git a/src/libraries/JANA/Engine/JExecutionEngine.cc b/src/libraries/JANA/Engine/JExecutionEngine.cc index 40e8cc9b5..ebcf89448 100644 --- a/src/libraries/JANA/Engine/JExecutionEngine.cc +++ b/src/libraries/JANA/Engine/JExecutionEngine.cc @@ -55,16 +55,16 @@ void JExecutionEngine::Init() { // Not sure how I feel about putting this here yet, but I think it will at least work in both cases it needs to. // The reason this works is because JTopologyBuilder::create_topology() has already been called before // JApplication::ProvideService(). - for (JArrow* arrow : m_topology->arrows) { + for (JArrow* arrow : m_topology->GetArrows()) { - arrow->initialize(); + arrow->Initialize(); m_arrow_states.emplace_back(); auto& arrow_state = m_arrow_states.back(); - arrow_state.is_source = arrow->is_source(); - arrow_state.is_sink = arrow->is_sink(); - arrow_state.is_parallel = arrow->is_parallel(); - arrow_state.next_input = arrow->get_next_port_index(); + arrow_state.is_source = arrow->IsSource(); + arrow_state.is_sink = arrow->IsSink(); + arrow_state.is_parallel = arrow->IsParallel(); + arrow_state.next_input = arrow->GetNextPortIndex(); } } @@ -125,17 +125,18 @@ void JExecutionEngine::ScaleWorkers(size_t nthreads) { if (prev_nthreads < nthreads) { // We are launching additional worker threads LOG_DEBUG(GetLogger()) << "Scaling up to " << nthreads << " worker threads" << LOG_END; + auto mapping = m_topology->GetProcessorMapping(); for (size_t worker_id=prev_nthreads; worker_id < nthreads; ++worker_id) { auto worker = std::make_unique(); worker->worker_id = worker_id; worker->is_stop_requested = false; - worker->cpu_id = m_topology->mapping.get_cpu_id(worker_id); - worker->location_id = m_topology->mapping.get_loc_id(worker_id); + worker->cpu_id = mapping.get_cpu_id(worker_id); + worker->location_id = mapping.get_loc_id(worker_id); worker->thread = new std::thread(&JExecutionEngine::RunWorker, this, Worker{worker_id, &worker->backtrace}); LOG_DEBUG(GetLogger()) << "Launching worker thread " << worker_id << " on cpu=" << worker->cpu_id << ", location=" << worker->location_id << LOG_END; m_worker_states.push_back(std::move(worker)); - bool pin_to_cpu = (m_topology->mapping.get_affinity() != JProcessorMapping::AffinityStrategy::None); + bool pin_to_cpu = (mapping.get_affinity() != JProcessorMapping::AffinityStrategy::None); if (pin_to_cpu) { JCpuInfo::PinThreadToCpu(worker->thread, worker->cpu_id); } @@ -316,14 +317,14 @@ void JExecutionEngine::HandleFailures() { // First, we log all of the failures we've found for (auto& worker: m_worker_states) { if (worker->is_timed_out) { - std::string arrow_name = (worker->last_arrow_id == static_cast(-1)) ? "(none)" : m_topology->arrows[worker->last_arrow_id]->get_name(); + std::string arrow_name = (worker->last_arrow_id == static_cast(-1)) ? "(none)" : m_topology->GetArrows()[worker->last_arrow_id]->GetName(); LOG_FATAL(GetLogger()) << "Timeout in worker thread " << worker->worker_id << " while executing " << arrow_name << " on event #" << worker->last_event_nr << LOG_END; pthread_kill(worker->thread->native_handle(), SIGUSR2); LOG_INFO(GetLogger()) << "Worker thread signalled; waiting for backtrace capture." << LOG_END; worker->backtrace.WaitForCapture(); } if (worker->stored_exception != nullptr) { - std::string arrow_name = (worker->last_arrow_id == static_cast(-1)) ? "(none)" : m_topology->arrows[worker->last_arrow_id]->get_name(); + std::string arrow_name = (worker->last_arrow_id == static_cast(-1)) ? "(none)" : m_topology->GetArrows()[worker->last_arrow_id]->GetName(); LOG_FATAL(GetLogger()) << "Exception in worker thread " << worker->worker_id << " while executing " << arrow_name << " on event #" << worker->last_event_nr << LOG_END; } } @@ -349,10 +350,10 @@ void JExecutionEngine::FinishTopology() { assert(m_runstatus == RunStatus::Paused); LOG_DEBUG(GetLogger()) << "Finishing processing..." << LOG_END; - for (auto* arrow : m_topology->arrows) { - arrow->finalize(); + for (auto* arrow : m_topology->GetArrows()) { + arrow->Finalize(); } - for (auto* pool: m_topology->pools) { + for (auto* pool: m_topology->GetPools()) { pool->Finalize(); } m_runstatus = RunStatus::Finished; @@ -391,16 +392,17 @@ JExecutionEngine::Perf JExecutionEngine::GetPerf() { JExecutionEngine::Worker JExecutionEngine::RegisterWorker() { std::unique_lock lock(m_mutex); + auto mapping = m_topology->GetProcessorMapping(); auto worker_id = m_worker_states.size(); auto worker = std::make_unique(); worker->worker_id = worker_id; worker->is_stop_requested = false; - worker->cpu_id = m_topology->mapping.get_cpu_id(worker_id); - worker->location_id = m_topology->mapping.get_loc_id(worker_id); + worker->cpu_id = mapping.get_cpu_id(worker_id); + worker->location_id = mapping.get_loc_id(worker_id); worker->thread = nullptr; m_worker_states.push_back(std::move(worker)); - bool pin_to_cpu = (m_topology->mapping.get_affinity() != JProcessorMapping::AffinityStrategy::None); + bool pin_to_cpu = (mapping.get_affinity() != JProcessorMapping::AffinityStrategy::None); if (pin_to_cpu) { JCpuInfo::PinThreadToCpu(worker->thread, worker->cpu_id); } @@ -419,7 +421,7 @@ void JExecutionEngine::RunWorker(Worker worker) { while (true) { ExchangeTask(task, worker.worker_id); if (task.arrow == nullptr) break; // Exit as soon as ExchangeTask() stops blocking - task.arrow->fire(task.input_event, task.outputs, task.output_count, task.status); + task.arrow->Fire(task.input_event, task.outputs, task.output_count, task.status); } LOG_DEBUG(GetLogger()) << "Stopped worker thread " << worker.worker_id << LOG_END; } @@ -493,13 +495,13 @@ void JExecutionEngine::CheckinCompletedTask_Unsafe(Task& task, WorkerState& work arrow_state.total_processing_duration += processing_duration; for (size_t output=0; outputget_port(task.outputs[output].second).is_input) { + if (!task.arrow->GetPort(task.outputs[output].second).GetSkipFinishEvent()) { arrow_state.events_processed++; } } // Put each output in its correct queue or pool - task.arrow->push(task.outputs, task.output_count, worker.location_id); + task.arrow->Push(task.outputs, task.output_count, worker.location_id); if (task.status == JArrow::FireResult::Finished) { // If this is an eventsource self-terminating (the only thing that returns Status::Finished right now) it will @@ -558,10 +560,10 @@ void JExecutionEngine::FindNextReadyTask_Unsafe(Task& task, WorkerState& worker) // TODO: Support next_visit_time so that we don't hammer blocked event sources // See if we can obtain an input event (this is silly) - JArrow* arrow = m_topology->arrows[arrow_id]; - // TODO: consider setting state.next_input, retrieving via fire() - auto port = arrow->get_next_port_index(); - JEvent* event = (port == -1) ? nullptr : arrow->pull(port, worker.location_id); + JArrow* arrow = m_topology->GetArrows()[arrow_id]; + // TODO: consider setting state.next_input, retrieving via Fire() + auto port = arrow->GetNextPortIndex(); + JEvent* event = (port == -1) ? nullptr : arrow->Pull(port, worker.location_id); if (event != nullptr || port == -1) { LOG_TRACE(GetLogger()) << "Scheduler: Found next ready arrow with id " << arrow_id << LOG_END; // We've found a task that is ready! @@ -657,18 +659,18 @@ void JExecutionEngine::PrintFinalReport() { size_t total_useful_ms = 0; for (size_t arrow_id=0; arrow_id < m_arrow_states.size(); ++arrow_id) { - auto* arrow = m_topology->arrows[arrow_id]; + auto* arrow = m_topology->GetArrows()[arrow_id]; auto& arrow_state = m_arrow_states[arrow_id]; auto useful_ms = std::chrono::duration_cast(arrow_state.total_processing_duration).count(); total_useful_ms += useful_ms; auto avg_latency = useful_ms*1.0/arrow_state.events_processed; auto throughput_bottleneck = 1000.0 / avg_latency; - if (arrow->is_parallel()) { + if (arrow->IsParallel()) { throughput_bottleneck *= thread_count; } - LOG_INFO(GetLogger()) << " - Arrow name: " << arrow->get_name() << LOG_END; - LOG_INFO(GetLogger()) << " Parallel: " << arrow->is_parallel() << LOG_END; + LOG_INFO(GetLogger()) << " - Arrow name: " << arrow->GetName() << LOG_END; + LOG_INFO(GetLogger()) << " Parallel: " << arrow->IsParallel() << LOG_END; LOG_INFO(GetLogger()) << " Events completed: " << arrow_state.events_processed << LOG_END; LOG_INFO(GetLogger()) << " Avg latency [ms/event]: " << avg_latency << LOG_END; LOG_INFO(GetLogger()) << " Throughput bottleneck [Hz]: " << throughput_bottleneck << LOG_END; @@ -708,12 +710,12 @@ bool JExecutionEngine::IsTimeoutEnabled() const { JArrow::FireResult JExecutionEngine::Fire(size_t arrow_id, size_t location_id) { std::unique_lock lock(m_mutex); - if (arrow_id >= m_topology->arrows.size()) { + if (arrow_id >= m_topology->GetArrows().size()) { LOG_WARN(GetLogger()) << "Firing unsuccessful: No arrow exists with id=" << arrow_id << LOG_END; return JArrow::FireResult::NotRunYet; } - JArrow* arrow = m_topology->arrows[arrow_id]; - LOG_WARN(GetLogger()) << "Attempting to fire arrow with name=" << arrow->get_name() + JArrow* arrow = m_topology->GetArrows()[arrow_id]; + LOG_WARN(GetLogger()) << "Attempting to fire arrow with name=" << arrow->GetName() << ", index=" << arrow_id << ", location=" << location_id << LOG_END; ArrowState& arrow_state = m_arrow_states[arrow_id]; @@ -727,10 +729,10 @@ JArrow::FireResult JExecutionEngine::Fire(size_t arrow_id, size_t location_id) { } arrow_state.active_tasks += 1; - auto port = arrow->get_next_port_index(); + auto port = arrow->GetNextPortIndex(); JEvent* event = nullptr; if (port != -1) { - event = arrow->pull(port, location_id); + event = arrow->Pull(port, location_id); if (event == nullptr) { LOG_WARN(GetLogger()) << "Firing unsuccessful: Arrow needs an input event from port " << port << ", but the queue or pool is empty." << LOG_END; arrow_state.active_tasks -= 1; @@ -750,8 +752,8 @@ JArrow::FireResult JExecutionEngine::Fire(size_t arrow_id, size_t location_id) { JArrow::FireResult result = JArrow::FireResult::NotRunYet; LOG_WARN(GetLogger()) << "Firing arrow" << LOG_END; - arrow->fire(event, outputs, output_count, result); - LOG_WARN(GetLogger()) << "Fired arrow with result " << to_string(result) << LOG_END; + arrow->Fire(event, outputs, output_count, result); + LOG_WARN(GetLogger()) << "Fired arrow with result " << ToString(result) << LOG_END; if (output_count == 0) { LOG_WARN(GetLogger()) << "No output events" << LOG_END; } @@ -762,7 +764,7 @@ JArrow::FireResult JExecutionEngine::Fire(size_t arrow_id, size_t location_id) { } lock.lock(); - arrow->push(outputs, output_count, location_id); + arrow->Push(outputs, output_count, location_id); arrow_state.active_tasks -= 1; lock.unlock(); return result; diff --git a/src/libraries/JANA/JApplication.cc b/src/libraries/JANA/JApplication.cc index 1bf29237d..a4a83ec1b 100644 --- a/src/libraries/JANA/JApplication.cc +++ b/src/libraries/JANA/JApplication.cc @@ -156,7 +156,7 @@ void JApplication::Initialize() { m_desired_nthreads = JCpuInfo::GetNumCpus(); } - topology_builder->create_topology(); + topology_builder->CreateTopology(); m_params->SetDefaultParameter("jana:inspect", m_inspect, "Controls whether to drop immediately into the Inspector upon Run()"); diff --git a/src/libraries/JANA/JEventSource.h b/src/libraries/JANA/JEventSource.h index 0908f5895..fab50c664 100644 --- a/src/libraries/JANA/JEventSource.h +++ b/src/libraries/JANA/JEventSource.h @@ -78,7 +78,7 @@ class JEventSource : public jana::components::JComponent, /// For work that should be done in parallel on a JEvent, but is tightly coupled to the JEventSource for some reason. - /// Called after Emit() by JEventMapArrow, but only if EnableProcessParallel(true) is set. Note that the JEvent& is not + /// Called after Emit() by JMapArrow, but only if EnableProcessParallel(true) is set. Note that the JEvent& is not /// const here, because we need to be able to call event.Insert() from here. Also note that `this` IS const, because /// it is not safe to access any state in parallel from here. Note that this includes things like calibration constants. /// If you need to safely access state, put use a JFactory instead. diff --git a/src/libraries/JANA/Topology/JArrow.cc b/src/libraries/JANA/Topology/JArrow.cc index f6a9e4937..5783c92aa 100644 --- a/src/libraries/JANA/Topology/JArrow.cc +++ b/src/libraries/JANA/Topology/JArrow.cc @@ -1,45 +1,26 @@ #include -void JArrow::create_ports(size_t inputs, size_t outputs) { - m_ports.clear(); - for (size_t i=0; i= m_ports.size()) { - throw JException("Attempting to attach to a non-existent port! arrow=%s, port=%d", m_name.c_str(), port); +JArrow::Port& JArrow::AddPort(std::string name, JEventLevel level) { + if (m_port_lookup.find(name) != m_port_lookup.end()) { + throw JException("Port with name '%s' already exists", name.c_str()); } - m_ports[port].queue = queue; + auto port = std::make_unique(name, level); + auto port_raw_ptr = port.get(); + m_ports.push_back(std::move(port)); + m_port_lookup[name] = m_ports.size()-1; + return *port_raw_ptr; } - -void JArrow::attach(JEventPool* pool, size_t port) { - // Place index is relative to whether it is an input or not - // Port index, however, is agnostic to whether it is an input or not - if (port >= m_ports.size()) { - throw JException("Attempting to attach to a non-existent place! arrow=%s, port=%d", m_name.c_str(), port); - } - m_ports[port].pool = pool; -} - - -JEvent* JArrow::pull(size_t port_index, size_t location_id) { +JEvent* JArrow::Pull(size_t port_index, size_t location_id) { JEvent* event = nullptr; auto& port = m_ports.at(port_index); - if (port.queue != nullptr) { - event = port.queue->Pop(location_id); + if (port->GetQueue() != nullptr) { + event = port->GetQueue()->Pop(location_id); } - else if (port.pool != nullptr){ - event = port.pool->Pop( location_id); + else if (port->GetPool() != nullptr){ + event = port->GetPool()->Pop( location_id); } else { throw JException("Arrow %s: Port %d not wired!", m_name.c_str(), port_index); @@ -49,17 +30,17 @@ JEvent* JArrow::pull(size_t port_index, size_t location_id) { } -void JArrow::push(OutputData& outputs, size_t output_count, size_t location_id) { +void JArrow::Push(OutputData& outputs, size_t output_count, size_t location_id) { for (size_t output = 0; output < output_count; ++output) { JEvent* event = outputs[output].first; int port_index = outputs[output].second; - Port& port = m_ports.at(port_index); - if (port.queue != nullptr) { - port.queue->Push(event, location_id); + Port& port = GetPort(port_index); + if (port.GetQueue() != nullptr) { + port.GetQueue()->Push(event, location_id); } - else if (port.pool != nullptr) { - event->Clear(!port.is_input); - port.pool->Ingest(event, location_id); + else if (port.GetPool() != nullptr) { + event->Clear(!port.GetSkipFinishEvent()); + port.GetPool()->Ingest(event, location_id); } else { throw JException("Arrow %s: Port %d not wired!", m_name.c_str(), port_index); @@ -67,7 +48,7 @@ void JArrow::push(OutputData& outputs, size_t output_count, size_t location_id) } } -JArrow::FireResult JArrow::execute(size_t location_id) { +JArrow::FireResult JArrow::Execute(size_t location_id) { auto start_total_time = std::chrono::steady_clock::now(); if (m_next_visit_time > start_total_time) { @@ -77,7 +58,7 @@ JArrow::FireResult JArrow::execute(size_t location_id) { JEvent* input = nullptr; if (m_next_input_port != -1) { - input = pull(m_next_input_port, location_id); + input = Pull(m_next_input_port, location_id); } if (input == nullptr && m_next_input_port != -1) { @@ -89,18 +70,18 @@ JArrow::FireResult JArrow::execute(size_t location_id) { // Remember that `input` might be nullptr, in case arrow doesn't need any input event OutputData outputs; - size_t output_count; + size_t output_count = 0; JArrow::FireResult result = JArrow::FireResult::KeepGoing; - fire(input, outputs, output_count, result); + Fire(input, outputs, output_count, result); - push(outputs, output_count, location_id); + Push(outputs, output_count, location_id); return result; } -std::string to_string(JArrow::FireResult r) { +std::string ToString(JArrow::FireResult r) { switch (r) { case JArrow::FireResult::NotRunYet: return "NotRunYet"; case JArrow::FireResult::KeepGoing: return "KeepGoing"; diff --git a/src/libraries/JANA/Topology/JArrow.h b/src/libraries/JANA/Topology/JArrow.h index 224fe5601..2cc4747dc 100644 --- a/src/libraries/JANA/Topology/JArrow.h +++ b/src/libraries/JANA/Topology/JArrow.h @@ -19,72 +19,121 @@ class JArrow { using OutputData = std::array, 2>; enum class FireResult {NotRunYet, KeepGoing, ComeBackLater, Finished}; - struct Port { - JEventQueue* queue = nullptr; - JEventPool* pool = nullptr; - bool is_input = false; - bool establishes_ordering = false; - bool enforces_ordering = false; + class Port { + std::string m_name; + std::vector m_levels; + JEventQueue* m_queue = nullptr; + JEventPool* m_pool = nullptr; + bool m_skip_finish_event = false; + bool m_establishes_ordering = false; + bool m_enforces_ordering = false; + + public: + Port(std::string name, std::vector levels): m_name(name), m_levels(levels) {}; + + Port(std::string name, JEventLevel level): m_name(name) { + m_levels.push_back(level); + }; + + const std::string& GetName() { return m_name; } + const std::vector& GetLevels() { return m_levels; } + bool GetEstablishesOrdering() { return m_establishes_ordering; } + bool GetEnforcesOrdering() { return m_enforces_ordering; } + bool GetSkipFinishEvent() { return m_skip_finish_event; } + + Port& SetEstablishesOrdering(bool establishes) { + m_establishes_ordering = establishes; + return *this; + } + + Port& SetEnforcesOrdering(bool enforces) { + m_enforces_ordering = enforces; + return *this; + } + + Port& SetSkipFinishEvent(bool skip_finish_event) { + this->m_skip_finish_event = skip_finish_event; + return *this; + } + + inline JEventPool* GetPool() { return m_pool; } + inline JEventQueue* GetQueue() { return m_queue; } + + void Attach(JEventQueue* queue) { + this->m_pool = nullptr; + this->m_queue = queue; + } + + void Attach(JEventPool* pool) { + this->m_pool = pool; + this->m_queue = nullptr; + } }; private: - std::string m_name; // Used for human understanding - bool m_is_parallel; // Whether or not it is safe to parallelize - bool m_is_source; // Whether or not this arrow should activate/drain the topology - bool m_is_sink; // Whether or not tnis arrow contributes to the final event count + std::string m_name; // Used for human understanding + bool m_is_parallel = false; // Whether or not it is safe to parallelize + bool m_is_source = false; // Whether or not this arrow should activate/drain the topology + bool m_is_sink = false; // Whether or not tnis arrow contributes to the final event count protected: using clock_t = std::chrono::steady_clock; int m_next_input_port=0; // -1 denotes "no input necessary", e.g. for barrier events clock_t::time_point m_next_visit_time=clock_t::now(); - std::vector m_ports; + std::vector> m_ports; + std::map m_port_lookup; JLogger m_logger; public: - const std::string& get_name() { return m_name; } - JLogger& get_logger() { return m_logger; } - bool is_parallel() { return m_is_parallel; } - bool is_source() { return m_is_source; } - bool is_sink() { return m_is_sink; } - Port& get_port(size_t port_index) { return m_ports.at(port_index); } - int get_next_port_index() { return m_next_input_port; } - - void set_name(std::string name) { m_name = name; } - void set_logger(JLogger logger) { m_logger = logger; } - void set_is_parallel(bool is_parallel) { m_is_parallel = is_parallel; } - void set_is_source(bool is_source) { m_is_source = is_source; } - void set_is_sink(bool is_sink) { m_is_sink = is_sink; } - - - JArrow() { - m_is_parallel = false; - m_is_source = false; - m_is_sink = false; - } - - JArrow(std::string name, bool is_parallel, bool is_source, bool is_sink) : - m_name(std::move(name)), m_is_parallel(is_parallel), m_is_source(is_source), m_is_sink(is_sink) { - }; + JArrow() = default; virtual ~JArrow() = default; - virtual void initialize() { }; + virtual void Initialize() {}; + + virtual void Fire(JEvent*, OutputData&, size_t&, FireResult&) {}; + + virtual void Finalize() {}; - virtual FireResult execute(size_t location_id); - virtual void fire(JEvent*, OutputData&, size_t&, FireResult&) {}; + FireResult Execute(size_t location_id); - virtual void finalize() {}; + JEvent* Pull(size_t input_port, size_t location_id); - void create_ports(size_t inputs, size_t outputs); + void Push(OutputData& outputs, size_t output_count, size_t location_id); - void attach(JEventQueue* queue, size_t port); - void attach(JEventPool* pool, size_t port); - JEvent* pull(size_t input_port, size_t location_id); - void push(OutputData& outputs, size_t output_count, size_t location_id); + const std::string& GetName() { return m_name; } + JLogger& GetLogger() { return m_logger; } + bool IsParallel() { return m_is_parallel; } + bool IsSource() { return m_is_source; } + bool IsSink() { return m_is_sink; } + int GetNextPortIndex() { return m_next_input_port; } + + void SetName(std::string name) { m_name = name; } + void SetLogger(JLogger logger) { m_logger = logger; } + void SetIsParallel(bool is_parallel) { m_is_parallel = is_parallel; } + void SetIsSource(bool is_source) { m_is_source = is_source; } + void SetIsSink(bool is_sink) { m_is_sink = is_sink; } + + Port& AddPort(std::string port_name, JEventLevel level); + Port& AddPort(std::string port_name, std::vector levels); + Port& GetPort(size_t port_index) { return *m_ports.at(port_index); } + int GetPortIndex(const std::string& port_name) { + auto it = m_port_lookup.find(port_name); + if (it == m_port_lookup.end()) { + LOG_FATAL(GetLogger()) << "Unable to find port_name '" << port_name << "' on arrow '" << GetName() << "'. Valid port names are:"; + for (auto& port : m_ports) { + LOG_FATAL(GetLogger()) << " " << port->GetName(); + } + throw JException("Unable to find port_name '%s' on arrow '%s'", port_name.c_str(), GetName().c_str()); + } + return it->second; + } + void SetNextPortIndex(int input_port) { m_next_input_port = input_port; } + }; -std::string to_string(JArrow::FireResult r); +std::string ToString(JArrow::FireResult r); diff --git a/src/libraries/JANA/Topology/JFoldArrow.h b/src/libraries/JANA/Topology/JFoldArrow.h index 47ffb569c..71d729b89 100644 --- a/src/libraries/JANA/Topology/JFoldArrow.h +++ b/src/libraries/JANA/Topology/JFoldArrow.h @@ -25,17 +25,18 @@ class JFoldArrow : public JArrow { : m_parent_level(parent_level), m_child_level(child_level) { - set_name(name); - create_ports(1, 2); + SetName(name); + AddPort("child_in", child_level).SetEnforcesOrdering(true); + AddPort("child_out", child_level); + AddPort("parent_out", parent_level); m_next_input_port = CHILD_IN; - m_ports[CHILD_IN].enforces_ordering = true; } - void set_folder(JEventFolder* folder) { + void SetFolder(JEventFolder* folder) { m_folder = folder; } - void initialize() final { + void Initialize() final { if (m_folder != nullptr) { m_folder->DoInit(); LOG_INFO(m_logger) << "Initialized JEventFolder '" << m_folder->GetTypeName() << "'" << LOG_END; @@ -45,7 +46,7 @@ class JFoldArrow : public JArrow { } } - void finalize() final { + void Finalize() final { if (m_folder != nullptr) { m_folder->DoFinish(); LOG_INFO(m_logger) << "Finalized JEventFolder '" << m_folder->GetTypeName() << "'" << LOG_END; @@ -55,7 +56,7 @@ class JFoldArrow : public JArrow { } } - void fire(JEvent* event, OutputData& outputs, size_t& output_count, JArrow::FireResult& status) final { + void Fire(JEvent* event, OutputData& outputs, size_t& output_count, JArrow::FireResult& status) final { assert(m_next_input_port == CHILD_IN); diff --git a/src/libraries/JANA/Topology/JEventMapArrow.cc b/src/libraries/JANA/Topology/JMapArrow.cc similarity index 69% rename from src/libraries/JANA/Topology/JEventMapArrow.cc rename to src/libraries/JANA/Topology/JMapArrow.cc index 72d43317d..2dcbae4dd 100644 --- a/src/libraries/JANA/Topology/JEventMapArrow.cc +++ b/src/libraries/JANA/Topology/JMapArrow.cc @@ -3,35 +3,35 @@ #include "JANA/JEventProcessor.h" -#include +#include #include #include #include -JEventMapArrow::JEventMapArrow(std::string name) { - set_name(name); - set_is_parallel(true); - create_ports(1, 1); - +JMapArrow::JMapArrow(std::string name, JEventLevel level) { + SetName(name); + SetIsParallel(true); + AddPort("in", level); + AddPort("out", level); } -void JEventMapArrow::add_source(JEventSource* source) { +void JMapArrow::AddSource(JEventSource* source) { m_sources.push_back(source); } -void JEventMapArrow::add_unfolder(JEventUnfolder* unfolder) { +void JMapArrow::AddUnfolder(JEventUnfolder* unfolder) { m_unfolders.push_back(unfolder); } -void JEventMapArrow::add_processor(JEventProcessor* processor) { +void JMapArrow::AddProcessor(JEventProcessor* processor) { m_procs.push_back(processor); } -void JEventMapArrow::fire(JEvent* event, OutputData& outputs, size_t& output_count, JArrow::FireResult& status) { +void JMapArrow::Fire(JEvent* event, OutputData& outputs, size_t& output_count, JArrow::FireResult& status) { - LOG_DEBUG(m_logger) << "Executing arrow " << get_name() << " for event# " << event->GetEventNumber() << LOG_END; + LOG_DEBUG(m_logger) << "Executing arrow " << GetName() << " for event# " << event->GetEventNumber() << LOG_END; for (JEventSource* source : m_sources) { JCallGraphEntryMaker cg_entry(*event->GetJCallGraphRecorder(), source->GetTypeName()); // times execution until this goes out of scope source->ProcessParallel(*event); @@ -49,14 +49,14 @@ void JEventMapArrow::fire(JEvent* event, OutputData& outputs, size_t& output_cou processor->DoMap(*event); } } - LOG_DEBUG(m_logger) << "Executed arrow " << get_name() << " for event# " << event->GetEventNumber() << LOG_END; + LOG_DEBUG(m_logger) << "Executed arrow " << GetName() << " for event# " << event->GetEventNumber() << LOG_END; outputs[0] = {event, 1}; output_count = 1; status = JArrow::FireResult::KeepGoing; } -void JEventMapArrow::initialize() { - LOG_DEBUG(m_logger) << "Initializing arrow '" << get_name() << "'" << LOG_END; +void JMapArrow::Initialize() { + LOG_DEBUG(m_logger) << "Initializing arrow '" << GetName() << "'" << LOG_END; for (auto processor : m_procs) { if (processor->GetCallbackStyle() == JEventProcessor::CallbackStyle::LegacyMode) { LOG_DEBUG(m_logger) << "Initializing JEventProcessor '" << processor->GetTypeName() << "'" << LOG_END; @@ -64,11 +64,11 @@ void JEventMapArrow::initialize() { LOG_INFO(m_logger) << "Initialized JEventProcessor '" << processor->GetTypeName() << "'" << LOG_END; } } - LOG_DEBUG(m_logger) << "Initialized arrow '" << get_name() << "'" << LOG_END; + LOG_DEBUG(m_logger) << "Initialized arrow '" << GetName() << "'" << LOG_END; } -void JEventMapArrow::finalize() { - LOG_DEBUG(m_logger) << "Finalizing arrow '" << get_name() << "'" << LOG_END; +void JMapArrow::Finalize() { + LOG_DEBUG(m_logger) << "Finalizing arrow '" << GetName() << "'" << LOG_END; for (auto processor : m_procs) { if (processor->GetCallbackStyle() == JEventProcessor::CallbackStyle::LegacyMode) { LOG_DEBUG(m_logger) << "Finalizing JEventProcessor " << processor->GetTypeName() << LOG_END; @@ -76,6 +76,6 @@ void JEventMapArrow::finalize() { LOG_INFO(m_logger) << "Finalized JEventProcessor " << processor->GetTypeName() << LOG_END; } } - LOG_DEBUG(m_logger) << "Finalized arrow " << get_name() << LOG_END; + LOG_DEBUG(m_logger) << "Finalized arrow " << GetName() << LOG_END; } diff --git a/src/libraries/JANA/Topology/JEventMapArrow.h b/src/libraries/JANA/Topology/JMapArrow.h similarity index 60% rename from src/libraries/JANA/Topology/JEventMapArrow.h rename to src/libraries/JANA/Topology/JMapArrow.h index 5eb3f0c39..72c7e1b21 100644 --- a/src/libraries/JANA/Topology/JEventMapArrow.h +++ b/src/libraries/JANA/Topology/JMapArrow.h @@ -12,7 +12,7 @@ class JEventProcessor; class JEvent; -class JEventMapArrow : public JArrow { +class JMapArrow : public JArrow { public: enum PortIndex {EVENT_IN=0, EVENT_OUT=1}; @@ -23,15 +23,15 @@ class JEventMapArrow : public JArrow { std::vector m_procs; public: - JEventMapArrow(std::string name); + JMapArrow(std::string name, JEventLevel level); - void add_source(JEventSource* source); - void add_unfolder(JEventUnfolder* unfolder); - void add_processor(JEventProcessor* proc); + void AddSource(JEventSource* source); + void AddUnfolder(JEventUnfolder* unfolder); + void AddProcessor(JEventProcessor* proc); - void fire(JEvent* input, OutputData& outputs, size_t& output_count, JArrow::FireResult& status); + void Fire(JEvent* input, OutputData& outputs, size_t& output_count, JArrow::FireResult& status); - void initialize() final; - void finalize() final; + void Initialize() final; + void Finalize() final; }; diff --git a/src/libraries/JANA/Topology/JMultilevelSourceArrow.cc b/src/libraries/JANA/Topology/JMultilevelSourceArrow.cc index acc718dbd..82abea323 100644 --- a/src/libraries/JANA/Topology/JMultilevelSourceArrow.cc +++ b/src/libraries/JANA/Topology/JMultilevelSourceArrow.cc @@ -13,13 +13,13 @@ void JMultilevelSourceArrow::SetEventSource(JEventSource* source) { size_t input_port_count = 0; size_t output_port_count = 0; for (auto level : m_levels) { + AddPort(toString(level) + "In", level).SetSkipFinishEvent(true); m_port_lookup[{level, Direction::In}] = input_port_count++; } for (auto level : m_levels) { + AddPort(toString(level) + "Out", level); m_port_lookup[{level, Direction::Out}] = input_port_count + output_port_count++; } - - create_ports(input_port_count, output_port_count); } const std::vector& JMultilevelSourceArrow::GetLevels() const { @@ -30,12 +30,12 @@ size_t JMultilevelSourceArrow::GetPortIndex(JEventLevel level, Direction directi return m_port_lookup.at({level, direction}); }; -void JMultilevelSourceArrow::initialize() { +void JMultilevelSourceArrow::Initialize() { // We initialize everything immediately, but don't open any resources until we absolutely have to; see process(): source->DoNext() m_source->DoInit(); } -void JMultilevelSourceArrow::finalize() { +void JMultilevelSourceArrow::Finalize() { // Generally JEventSources finalize themselves as soon as they detect that they have run out of events. // However, we can't rely on the JEventSources turning themselves off since execution can be externally paused. // Instead we leave everything open until we finalize the whole topology, and finalize remaining event sources then. @@ -52,23 +52,23 @@ void JMultilevelSourceArrow::EvictNextParent(OutputData& outputs, size_t& output if (it->second.first != nullptr) { // There IS an old parent size_t parent_output_port = GetPortIndex(m_next_input_level, Direction::Out); - LOG_DEBUG(get_logger()) << "JMultilevelSourceArrow: Evicting parent " << it->second.first->GetEventStamp() << " to port " << parent_output_port; + LOG_DEBUG(GetLogger()) << "JMultilevelSourceArrow: Evicting parent " << it->second.first->GetEventStamp() << " to port " << parent_output_port; outputs.at(output_count++) = {it->second.first, parent_output_port}; it->second.first = nullptr; } } } -void JMultilevelSourceArrow::fire(JEvent* input, OutputData& outputs, size_t& output_count, JArrow::FireResult& status) { +void JMultilevelSourceArrow::Fire(JEvent* input, OutputData& outputs, size_t& output_count, JArrow::FireResult& status) { if (!m_finish_in_progress) { - LOG_DEBUG(m_logger) << "Executing arrow " << get_name() << LOG_END; + LOG_DEBUG(m_logger) << "Executing arrow " << GetName() << LOG_END; auto result = m_source->DoNext(input->shared_from_this()); m_next_input_level = m_source->GetNextInputLevel(); m_next_input_port = GetPortIndex(m_next_input_level, Direction::In); - LOG_DEBUG(get_logger()) << "JMultilevelSourceArrow: Returned from DoNext(" << toString(input->GetLevel()) << "). Next input level is " << toString(m_next_input_level); + LOG_DEBUG(GetLogger()) << "JMultilevelSourceArrow: Returned from DoNext(" << toString(input->GetLevel()) << "). Next input level is " << toString(m_next_input_level); if (result == JEventSource::Result::Success) { // We have a newly filled event we have to do something with @@ -80,7 +80,7 @@ void JMultilevelSourceArrow::fire(JEvent* input, OutputData& outputs, size_t& ou // Note that this only attaches parents that we already, so if the parents arrive in the wrong order they // will just be missing. If this is expected behavior, you'll need to set your downstream parent inputs to be optional. if (parent_pair.first != nullptr) { - LOG_DEBUG(get_logger()) << "JMultilevelSourceArrow: Attaching parent: " << parent_pair.first->GetEventStamp() << " to event " << input->GetEventStamp(); + LOG_DEBUG(GetLogger()) << "JMultilevelSourceArrow: Attaching parent: " << parent_pair.first->GetEventStamp() << " to event " << input->GetEventStamp(); input->SetParent(parent_pair.first); } } diff --git a/src/libraries/JANA/Topology/JMultilevelSourceArrow.h b/src/libraries/JANA/Topology/JMultilevelSourceArrow.h index 387a3ad71..7f2bbb509 100644 --- a/src/libraries/JANA/Topology/JMultilevelSourceArrow.h +++ b/src/libraries/JANA/Topology/JMultilevelSourceArrow.h @@ -21,14 +21,18 @@ class JMultilevelSourceArrow : public JArrow { void EvictNextParent(OutputData& outputs, size_t& output_count); public: + JMultilevelSourceArrow() { + SetIsSource(true); + } + const std::vector& GetLevels() const; size_t GetPortIndex(JEventLevel level, Direction direction) const; void SetEventSource(JEventSource* source); - void initialize() override; - void fire(JEvent* input, OutputData& outputs, size_t& output_count, JArrow::FireResult& status) override; - void finalize() override; + void Initialize() override; + void Fire(JEvent* input, OutputData& outputs, size_t& output_count, JArrow::FireResult& status) override; + void Finalize() override; }; diff --git a/src/libraries/JANA/Topology/JEventSourceArrow.cc b/src/libraries/JANA/Topology/JSourceArrow.cc similarity index 73% rename from src/libraries/JANA/Topology/JEventSourceArrow.cc rename to src/libraries/JANA/Topology/JSourceArrow.cc index fb5782027..a9ae317db 100644 --- a/src/libraries/JANA/Topology/JEventSourceArrow.cc +++ b/src/libraries/JANA/Topology/JSourceArrow.cc @@ -5,16 +5,17 @@ #include #include -#include +#include -JEventSourceArrow::JEventSourceArrow(std::string name, std::vector sources) +JSourceArrow::JSourceArrow(std::string name, JEventLevel level, std::vector sources) : m_sources(sources) { - set_name(name); - set_is_source(true); - create_ports(1, 1); - m_ports[EVENT_OUT].establishes_ordering = true; + SetName(name); + SetIsSource(true); + AddPort("in", level).SetSkipFinishEvent(true); + AddPort("out", level).SetEstablishesOrdering(true); + // All event sources establish their own ordering by default, // which is sufficient for the kinds of topologies we can create // using JTopologyBuilder. In the future we may need something @@ -24,9 +25,9 @@ JEventSourceArrow::JEventSourceArrow(std::string name, std::vectorDoNext(event->shared_from_this()); if (source_status == JEventSource::Result::FailureFinished) { - LOG_DEBUG(m_logger) << "Executed arrow " << get_name() << " with result FailureFinished"<< LOG_END; + LOG_DEBUG(m_logger) << "Executed arrow " << GetName() << " with result FailureFinished"<< LOG_END; m_current_source++; // TODO: Adjust nskip and nevents for the new source } else if (source_status == JEventSource::Result::FailureTryAgain){ // This JEventSource isn't finished yet, so we obtained either Success or TryAgainLater - LOG_DEBUG(m_logger) << "Executed arrow " << get_name() << " with result ComeBackLater"<< LOG_END; + LOG_DEBUG(m_logger) << "Executed arrow " << GetName() << " with result ComeBackLater"<< LOG_END; outputs[0] = {event, 0}; // Reject output_count = 1; status = JArrow::FireResult::ComeBackLater; @@ -109,7 +110,7 @@ void JEventSourceArrow::fire(JEvent* event, OutputData& outputs, size_t& output_ } else if (event->GetSequential()){ // Source succeeded, but returned a barrier event - LOG_DEBUG(m_logger) << "Executed arrow " << get_name() << " with result Success, holding back barrier event# " << event->GetEventNumber() << LOG_END; + LOG_DEBUG(m_logger) << "Executed arrow " << GetName() << " with result Success, holding back barrier event# " << event->GetEventNumber() << LOG_END; m_pending_barrier_event = event; m_barrier_active = true; m_next_input_port = -1; // Stop popping events from the input queue until barrier event has finished @@ -121,7 +122,7 @@ void JEventSourceArrow::fire(JEvent* event, OutputData& outputs, size_t& output_ } else { // Source succeeded, did NOT return a barrier event - LOG_DEBUG(m_logger) << "Executed arrow " << get_name() << " with result Success, emitting event# " << event->GetEventNumber() << LOG_END; + LOG_DEBUG(m_logger) << "Executed arrow " << GetName() << " with result Success, emitting event# " << event->GetEventNumber() << LOG_END; outputs[0] = {event, 1}; // SUCCESS! output_count = 1; status = JArrow::FireResult::KeepGoing; @@ -135,7 +136,7 @@ void JEventSourceArrow::fire(JEvent* event, OutputData& outputs, size_t& output_ status = JArrow::FireResult::Finished; } -void JEventSourceArrow::initialize() { +void JSourceArrow::Initialize() { // We initialize everything immediately, but don't open any resources until we absolutely have to; see process(): source->DoNext() for (JEventSource* source : m_sources) { source->DoInit(); @@ -143,7 +144,7 @@ void JEventSourceArrow::initialize() { } } -void JEventSourceArrow::finalize() { +void JSourceArrow::Finalize() { // Generally JEventSources finalize themselves as soon as they detect that they have run out of events. // However, we can't rely on the JEventSources turning themselves off since execution can be externally paused. // Instead we leave everything open until we finalize the whole topology, and finalize remaining event sources then. diff --git a/src/libraries/JANA/Topology/JEventSourceArrow.h b/src/libraries/JANA/Topology/JSourceArrow.h similarity index 64% rename from src/libraries/JANA/Topology/JEventSourceArrow.h rename to src/libraries/JANA/Topology/JSourceArrow.h index c33c784f0..64b9b08c7 100644 --- a/src/libraries/JANA/Topology/JEventSourceArrow.h +++ b/src/libraries/JANA/Topology/JSourceArrow.h @@ -6,7 +6,7 @@ #include -class JEventSourceArrow : public JArrow { +class JSourceArrow : public JArrow { public: enum PortIndex {EVENT_IN=0, EVENT_OUT=1}; @@ -17,10 +17,10 @@ class JEventSourceArrow : public JArrow { JEvent* m_pending_barrier_event = nullptr; public: - JEventSourceArrow(std::string name, std::vector sources); + JSourceArrow(std::string name, JEventLevel level, std::vector sources); - void initialize() final; - void finalize() final; - void fire(JEvent* input, OutputData& outputs, size_t& output_count, JArrow::FireResult& status); + void Initialize() final; + void Finalize() final; + void Fire(JEvent* input, OutputData& outputs, size_t& output_count, JArrow::FireResult& status); }; diff --git a/src/libraries/JANA/Topology/JEventTapArrow.cc b/src/libraries/JANA/Topology/JTapArrow.cc similarity index 61% rename from src/libraries/JANA/Topology/JEventTapArrow.cc rename to src/libraries/JANA/Topology/JTapArrow.cc index f24a88710..4ea85060f 100644 --- a/src/libraries/JANA/Topology/JEventTapArrow.cc +++ b/src/libraries/JANA/Topology/JTapArrow.cc @@ -2,27 +2,28 @@ // Subject to the terms in the LICENSE file found in the top-level directory. -#include +#include #include #include #include -JEventTapArrow::JEventTapArrow(std::string name) { - set_name(name); - create_ports(1,1); +JTapArrow::JTapArrow(std::string name, JEventLevel level) { + SetName(name); + AddPort("in", level); + AddPort("out", level); } -void JEventTapArrow::add_processor(JEventProcessor* proc) { +void JTapArrow::AddProcessor(JEventProcessor* proc) { if (proc->IsOrderingEnabled()) { - m_ports[EVENT_IN].enforces_ordering = true; + m_ports[EVENT_IN]->SetEnforcesOrdering(true); } m_procs.push_back(proc); } -void JEventTapArrow::fire(JEvent* event, OutputData& outputs, size_t& output_count, JArrow::FireResult& status) { +void JTapArrow::Fire(JEvent* event, OutputData& outputs, size_t& output_count, JArrow::FireResult& status) { - LOG_DEBUG(m_logger) << "Executing arrow " << get_name() << " for event# " << event->GetEventNumber() << LOG_END; + LOG_DEBUG(m_logger) << "Executing arrow " << GetName() << " for event# " << event->GetEventNumber() << LOG_END; for (JEventProcessor* proc : m_procs) { JCallGraphEntryMaker cg_entry(*event->GetJCallGraphRecorder(), proc->GetTypeName()); // times execution until this goes out of scope if (proc->GetCallbackStyle() != JEventProcessor::CallbackStyle::LegacyMode) { @@ -32,26 +33,26 @@ void JEventTapArrow::fire(JEvent* event, OutputData& outputs, size_t& output_cou outputs[0] = {event, 1}; output_count = 1; status = JArrow::FireResult::KeepGoing; - LOG_DEBUG(m_logger) << "Executed arrow " << get_name() << " for event# " << event->GetEventNumber() << LOG_END; + LOG_DEBUG(m_logger) << "Executed arrow " << GetName() << " for event# " << event->GetEventNumber() << LOG_END; } -void JEventTapArrow::initialize() { - LOG_DEBUG(m_logger) << "Initializing arrow '" << get_name() << "'" << LOG_END; +void JTapArrow::Initialize() { + LOG_DEBUG(m_logger) << "Initializing arrow '" << GetName() << "'" << LOG_END; for (auto processor : m_procs) { LOG_DEBUG(m_logger) << "Initializing JEventProcessor '" << processor->GetTypeName() << "'" << LOG_END; processor->DoInit(); LOG_INFO(m_logger) << "Initialized JEventProcessor '" << processor->GetTypeName() << "'" << LOG_END; } - LOG_DEBUG(m_logger) << "Initialized arrow '" << get_name() << "'" << LOG_END; + LOG_DEBUG(m_logger) << "Initialized arrow '" << GetName() << "'" << LOG_END; } -void JEventTapArrow::finalize() { - LOG_DEBUG(m_logger) << "Finalizing arrow '" << get_name() << "'" << LOG_END; +void JTapArrow::Finalize() { + LOG_DEBUG(m_logger) << "Finalizing arrow '" << GetName() << "'" << LOG_END; for (auto processor : m_procs) { LOG_DEBUG(m_logger) << "Finalizing JEventProcessor '" << processor->GetTypeName() << "'" << LOG_END; processor->DoFinalize(); LOG_INFO(m_logger) << "Finalized JEventProcessor '" << processor->GetTypeName() << "'" << LOG_END; } - LOG_DEBUG(m_logger) << "Finalized arrow '" << get_name() << "'" << LOG_END; + LOG_DEBUG(m_logger) << "Finalized arrow '" << GetName() << "'" << LOG_END; } diff --git a/src/libraries/JANA/Topology/JEventTapArrow.h b/src/libraries/JANA/Topology/JTapArrow.h similarity index 55% rename from src/libraries/JANA/Topology/JEventTapArrow.h rename to src/libraries/JANA/Topology/JTapArrow.h index 1a3975bc1..f8d639897 100644 --- a/src/libraries/JANA/Topology/JEventTapArrow.h +++ b/src/libraries/JANA/Topology/JTapArrow.h @@ -3,13 +3,14 @@ #pragma once +#include "JANA/Utils/JEventLevel.h" #include class JEventProcessor; class JEvent; -class JEventTapArrow : public JArrow { +class JTapArrow : public JArrow { public: enum PortIndex {EVENT_IN=0, EVENT_OUT=1}; @@ -17,12 +18,12 @@ class JEventTapArrow : public JArrow { std::vector m_procs; public: - JEventTapArrow(std::string name); + JTapArrow(std::string name, JEventLevel level=JEventLevel::PhysicsEvent); - void add_processor(JEventProcessor* proc); + void AddProcessor(JEventProcessor* proc); - void fire(JEvent* event, OutputData& outputs, size_t& output_count, JArrow::FireResult& status) final; - void initialize() final; - void finalize() final; + void Fire(JEvent* event, OutputData& outputs, size_t& output_count, JArrow::FireResult& status) final; + void Initialize() final; + void Finalize() final; }; diff --git a/src/libraries/JANA/Topology/JTopologyBuilder.cc b/src/libraries/JANA/Topology/JTopologyBuilder.cc index 0abd4543d..af621a0e1 100644 --- a/src/libraries/JANA/Topology/JTopologyBuilder.cc +++ b/src/libraries/JANA/Topology/JTopologyBuilder.cc @@ -2,18 +2,20 @@ // Copyright 2020, Jefferson Science Associates, LLC. // Subject to the terms in the LICENSE file found in the top-level directory. - #include "JTopologyBuilder.h" -#include "JEventSourceArrow.h" -#include "JEventMapArrow.h" -#include "JEventTapArrow.h" +#include +#include + +#include "JANA/Utils/JEventLevel.h" +#include "JSourceArrow.h" +#include "JMapArrow.h" +#include "JTapArrow.h" #include "JUnfoldArrow.h" #include "JFoldArrow.h" #include +#include #include -#include -#include JTopologyBuilder::JTopologyBuilder() { @@ -32,11 +34,62 @@ JTopologyBuilder::~JTopologyBuilder() { } } -std::string JTopologyBuilder::print_topology() { +void JTopologyBuilder::AddArrow(JArrow* arrow) { + arrows.push_back(arrow); + auto it = arrow_lookup.find(arrow->GetName()); + if (it != arrow_lookup.end()) { + throw JException("AddArrow(): Arrow with name '%s' has already been added", arrow->GetName().c_str()); + } + arrow_lookup[arrow->GetName()] = arrow; +} + + +JEventPool* JTopologyBuilder::GetOrCreatePool(JEventLevel level) { + auto pool_it = pool_lookup.find(level); + if (pool_it != pool_lookup.end()) { + return pool_it->second; + } + else { + auto* pool = new JEventPool(m_components, m_max_inflight_events[level], m_location_count, level); + pools.push_back(pool); + pool_lookup[level] = pool; + return pool; + } +} + +void JTopologyBuilder::ConnectPool(std::string arrow_name, std::string port_name, JEventLevel level) { + auto& arrow = *arrow_lookup.at(arrow_name); + auto port_index = arrow.GetPortIndex(port_name); + auto& port = arrow.GetPort(port_index); + auto* pool = GetOrCreatePool(level); + port.Attach(pool); +} + +void JTopologyBuilder::ConnectPool(JEventLevel upstream_level, JEventLevel downstream_level) { + auto* upstream_pool = GetOrCreatePool(upstream_level); + auto* downstream_pool = GetOrCreatePool(downstream_level); + upstream_pool->AttachForwardingPool(downstream_pool); +} + +void JTopologyBuilder::ConnectQueue(std::string upstream_arrow_name, + std::string upstream_port_name, + std::string downstream_arrow_name, + std::string downstream_port_name) { + + auto& upstream_arrow = *arrow_lookup.at(upstream_arrow_name); + auto upstream_port_id = upstream_arrow.GetPortIndex(upstream_port_name); + auto& downstream_arrow = *arrow_lookup.at(downstream_arrow_name); + auto downstream_port_id = downstream_arrow.GetPortIndex(downstream_port_name); + + Connect(&upstream_arrow, upstream_port_id, + &downstream_arrow, downstream_port_id); +} + +std::string JTopologyBuilder::PrintTopology() { JTablePrinter t; t.AddColumn("Arrow", JTablePrinter::Justify::Left, 0); t.AddColumn("Parallel", JTablePrinter::Justify::Center, 0); - t.AddColumn("Direction", JTablePrinter::Justify::Left, 0); + t.AddColumn("Port", JTablePrinter::Justify::Left, 0); t.AddColumn("Place", JTablePrinter::Justify::Left, 0); t.AddColumn("ID", JTablePrinter::Justify::Left, 0); @@ -59,19 +112,19 @@ std::string JTopologyBuilder::print_topology() { for (JArrow* arrow : arrows) { show_row = true; - for (JArrow::Port& port : arrow->m_ports) { + for (auto& port : arrow->m_ports) { if (show_row) { - t | arrow->get_name(); - t | arrow->is_parallel(); + t | arrow->GetName(); + t | arrow->IsParallel(); show_row = false; } else { t | "" | "" ; } - auto place_index = lookup[(port.queue!=nullptr) ? (void*) port.queue : (void*) port.pool]; + auto place_index = lookup[(port->GetQueue()!=nullptr) ? (void*) port->GetQueue() : (void*) port->GetPool()]; - t | ((port.is_input) ? "Input ": "Output"); - t | ((port.queue != nullptr) ? "Queue ": "Pool"); + t | port->GetName(); + t | ((port->GetQueue() != nullptr) ? "Queue ": "Pool"); t | place_index; } } @@ -83,24 +136,24 @@ std::string JTopologyBuilder::print_topology() { /// It provides an 'empty' JArrowTopology which has been furnished with a pointer to the JComponentManager, the JEventPool, /// and the JProcessorMapping (in case you care about NUMA details). However, it does not contain any queues or arrows. /// You have to furnish those yourself. -void JTopologyBuilder::set_configure_fn(std::function configure_fn) { +void JTopologyBuilder::SetConfigureFn(std::function configure_fn) { m_configure_topology = std::move(configure_fn); } -void JTopologyBuilder::create_topology() { +void JTopologyBuilder::CreateTopology() { mapping.initialize(static_cast(m_affinity), static_cast(m_locality)); if (m_configure_topology) { - m_configure_topology(*this); - LOG_WARN(GetLogger()) << "Found custom topology configurator! Modified arrow topology is: \n" << print_topology() << LOG_END; + m_configure_topology(*this, *m_components); + LOG_WARN(GetLogger()) << "Found custom topology configurator! Modified arrow topology is: \n" << PrintTopology() << LOG_END; } else { - attach_level(JEventLevel::Run, nullptr, nullptr); - LOG_INFO(GetLogger()) << "Arrow topology is:\n" << print_topology() << LOG_END; + AttachLevel(JEventLevel::Run, nullptr, nullptr); + LOG_INFO(GetLogger()) << "Arrow topology is:\n" << PrintTopology() << LOG_END; } for (auto* arrow : arrows) { - arrow->set_logger(GetLogger()); + arrow->SetLogger(GetLogger()); } // _Don't_ establish ordering if nobody needs it! @@ -130,17 +183,38 @@ void JTopologyBuilder::Init() { // We default event pool size to be equal to nthreads // We parse the 'nthreads' parameter two different ways for backwards compatibility. + size_t nthreads = 1; if (m_params->Exists("nthreads")) { if (m_params->GetParameterValue("nthreads") == "Ncores") { - m_max_inflight_events = JCpuInfo::GetNumCpus(); + nthreads = JCpuInfo::GetNumCpus(); } else { - m_max_inflight_events = m_params->GetParameterValue("nthreads"); + nthreads = m_params->GetParameterValue("nthreads"); } } - m_params->SetDefaultParameter("jana:max_inflight_events", m_max_inflight_events, - "The number of events which may be in-flight at once. Should be at least `nthreads` to prevent starvation; more gives better load balancing.") - ->SetIsAdvanced(true); + m_max_inflight_events[JEventLevel::Run] = m_params->RegisterParameter("jana:max_inflight_runs", nthreads, + "The number of runs which may be in-flight at once."); + + m_max_inflight_events[JEventLevel::Subrun] = m_params->RegisterParameter("jana:max_inflight_subruns", nthreads, + "The number of subruns which may be in-flight at once."); + + m_max_inflight_events[JEventLevel::Timeslice] = m_params->RegisterParameter("jana:max_inflight_timeslices", nthreads, + "The number of timeslices which may be in-flight at once."); + + m_max_inflight_events[JEventLevel::Block] = m_params->RegisterParameter("jana:max_inflight_blocks", nthreads, + "The number of blocks which may be in-flight at once."); + + m_max_inflight_events[JEventLevel::SlowControls] = m_params->RegisterParameter("jana:max_inflight_slowcontrols", nthreads, + "The number of slow control events which may be in-flight at once."); + + m_max_inflight_events[JEventLevel::PhysicsEvent] = m_params->RegisterParameter("jana:max_inflight_events", nthreads, + "The number of physics events which may be in-flight at once. Should be at least `nthreads` to prevent starvation; more gives better load balancing."); + + m_max_inflight_events[JEventLevel::Subevent] = m_params->RegisterParameter("jana:max_inflight_subevents", 4*nthreads, + "The number of subevents which may be in-flight at once."); + + m_max_inflight_events[JEventLevel::Task] = m_params->RegisterParameter("jana:max_inflight_tasks", 8*nthreads, + "The number of tasks which may be in-flight at once."); /* m_params->SetDefaultParameter("jana:enable_stealing", m_enable_stealing, @@ -156,48 +230,75 @@ void JTopologyBuilder::Init() { }; -void JTopologyBuilder::connect(JArrow* upstream, size_t upstream_port_id, JArrow* downstream, size_t downstream_port_id) { +void JTopologyBuilder::Connect(JArrow* upstream, size_t upstream_port_id, JArrow* downstream, size_t downstream_port_id) { - JEventQueue* queue = nullptr; + JArrow::Port& upstream_port = upstream->GetPort(upstream_port_id); + JArrow::Port& downstream_port = downstream->GetPort(downstream_port_id); + + LOG_DEBUG(GetLogger()) << "Connecting arrows: " << upstream->GetName() << ":" << upstream_port.GetName() << " --> " << downstream->GetName() << ":" << downstream_port.GetName(); - JArrow::Port& downstream_port = downstream->m_ports.at(downstream_port_id); - if (downstream_port.queue != nullptr) { - // If the queue already exists, use that! - queue = downstream_port.queue; + // Enforce that multiple upstreams can share a downstream, but not vice versa + if (upstream_port.GetQueue() != nullptr) { + throw JException("Upstream port '%s' on arrow '%s' already has a queue", upstream_port.GetName().c_str(), upstream->GetName().c_str()); + } + + // Enforce that any event levels that are produced upstream must be accepted downstream + for (auto level: upstream_port.GetLevels()) { + bool level_found = false; + for (auto downstream_level : downstream_port.GetLevels()) { + if (downstream_level == level) { + level_found = true; + break; + } + } + if (!level_found) { + LOG_FATAL(GetLogger()) << "Level " << toString(level) << " produced upstream but not accepted downstream: " + << upstream->GetName() << ":" << upstream_port.GetName() << " --> " << downstream->GetName() << ":" << downstream_port.GetName(); + throw JException("Level produced upstream but not accepted downstream"); + } + } + + JEventQueue* queue = nullptr; + if (downstream_port.GetQueue() != nullptr) { + queue = downstream_port.GetQueue(); } else { - // Create a new queue - queue = new JEventQueue(m_max_inflight_events, mapping.get_loc_count()); - downstream_port.queue = queue; + // Create new queue + size_t queue_capacity = 0; + for (auto level : downstream_port.GetLevels()) { + queue_capacity += m_max_inflight_events[level]; + } + queue = new JEventQueue(queue_capacity, mapping.get_loc_count()); queues.push_back(queue); + downstream_port.Attach(queue); } - downstream_port.pool = nullptr; - if (downstream_port.enforces_ordering) { + + + + upstream_port.Attach(queue); + + if (downstream_port.GetEnforcesOrdering()) { queue->SetEnforcesOrdering(); } - - JArrow::Port& upstream_port = upstream->m_ports.at(upstream_port_id); - upstream_port.queue = queue; - if (upstream_port.establishes_ordering) { + if (upstream_port.GetEstablishesOrdering()) { queue->SetEstablishesOrdering(true); } - upstream_port.pool = nullptr; } -void JTopologyBuilder::connect_to_first_available(JArrow* upstream, size_t upstream_port, std::vector> downstreams) { +void JTopologyBuilder::ConnectToFirstAvailable(JArrow* upstream, size_t upstream_port, std::vector> downstreams) { for (auto& [downstream, downstream_port_id] : downstreams) { if (downstream != nullptr) { - connect(upstream, upstream_port, downstream, downstream_port_id); + Connect(upstream, upstream_port, downstream, downstream_port_id); return; } } } -std::pair JTopologyBuilder::create_tap_chain(std::vector& procs, std::string level) { +std::pair JTopologyBuilder::CreateTapChain(std::vector& procs, std::string level) { - JEventTapArrow* first = nullptr; - JEventTapArrow* last = nullptr; + JTapArrow* first = nullptr; + JTapArrow* last = nullptr; int i=1; std::string arrow_name = level + "Tap"; @@ -205,14 +306,14 @@ std::pair JTopologyBuilder::create_tap_chain(s if (procs.size() > 1) { arrow_name += std::to_string(i++); } - JEventTapArrow* current = new JEventTapArrow(arrow_name); - current->add_processor(proc); + JTapArrow* current = new JTapArrow(arrow_name, proc->GetLevel()); + current->AddProcessor(proc); arrows.push_back(current); if (first == nullptr) { first = current; } if (last != nullptr) { - connect(last, JEventTapArrow::EVENT_OUT, current, JEventTapArrow::EVENT_IN); + Connect(last, JTapArrow::EVENT_OUT, current, JTapArrow::EVENT_IN); } last = current; } @@ -220,7 +321,7 @@ std::pair JTopologyBuilder::create_tap_chain(s } -void JTopologyBuilder::attach_level(JEventLevel current_level, JUnfoldArrow* parent_unfolder, JFoldArrow* parent_folder) { +void JTopologyBuilder::AttachLevel(JEventLevel current_level, JUnfoldArrow* parent_unfolder, JFoldArrow* parent_folder) { std::stringstream ss; ss << current_level; auto level_str = ss.str(); @@ -274,7 +375,7 @@ void JTopologyBuilder::attach_level(JEventLevel current_level, JUnfoldArrow* par LOG_WARN(GetLogger()) << "No sources found: Processing topology will be empty." << LOG_END; return; } - return attach_level(next, nullptr, nullptr); + return AttachLevel(next, nullptr, nullptr); } // Enforce constraints on what our builder will accept (at least for now) @@ -295,20 +396,20 @@ void JTopologyBuilder::attach_level(JEventLevel current_level, JUnfoldArrow* par // -------------------------- // 0. Pool // -------------------------- - LOG_INFO(GetLogger()) << "Creating event pool with level=" << current_level << " and size=" << m_max_inflight_events; - JEventPool* pool_at_level = new JEventPool(m_components, m_max_inflight_events, m_location_count, current_level); + LOG_INFO(GetLogger()) << "Creating event pool with level=" << current_level << " and size=" << m_max_inflight_events[current_level]; + JEventPool* pool_at_level = new JEventPool(m_components, m_max_inflight_events[current_level], m_location_count, current_level); pools.push_back(pool_at_level); // Hand over ownership of the pool to the topology - LOG_INFO(GetLogger()) << "Created event pool with level=" << current_level << " and size=" << m_max_inflight_events; + LOG_INFO(GetLogger()) << "Finished creating event pool"; // -------------------------- // 1. Source // -------------------------- - JEventSourceArrow* src_arrow = nullptr; + JSourceArrow* src_arrow = nullptr; bool need_source = !sources_at_level.empty(); if (need_source) { - src_arrow = new JEventSourceArrow(level_str+"Source", sources_at_level); - src_arrow->attach(pool_at_level, JEventSourceArrow::EVENT_IN); - src_arrow->attach(pool_at_level, JEventSourceArrow::EVENT_OUT); + src_arrow = new JSourceArrow(level_str+"Source", current_level, sources_at_level); + src_arrow->GetPort(JSourceArrow::EVENT_IN).Attach(pool_at_level); + src_arrow->GetPort(JSourceArrow::EVENT_OUT).Attach(pool_at_level); arrows.push_back(src_arrow); } @@ -320,21 +421,21 @@ void JTopologyBuilder::attach_level(JEventLevel current_level, JUnfoldArrow* par have_parallel_sources |= source->IsProcessParallelEnabled(); } bool have_unfolder = !unfolders_at_level.empty(); - JEventMapArrow* map1_arrow = nullptr; + JMapArrow* map1_arrow = nullptr; bool need_map1 = (have_parallel_sources || have_unfolder); if (need_map1) { - map1_arrow = new JEventMapArrow(level_str+"Map1"); + map1_arrow = new JMapArrow(level_str+"Map1", current_level); for (JEventSource* source: sources_at_level) { if (source->IsProcessParallelEnabled()) { - map1_arrow->add_source(source); + map1_arrow->AddSource(source); } } for (JEventUnfolder* unf: unfolders_at_level) { - map1_arrow->add_unfolder(unf); + map1_arrow->AddUnfolder(unf); } - map1_arrow->attach(pool_at_level, JEventMapArrow::EVENT_IN); - map1_arrow->attach(pool_at_level, JEventMapArrow::EVENT_OUT); + map1_arrow->GetPort(JMapArrow::EVENT_IN).Attach(pool_at_level); + map1_arrow->GetPort(JMapArrow::EVENT_OUT).Attach(pool_at_level); arrows.push_back(map1_arrow); } @@ -345,7 +446,7 @@ void JTopologyBuilder::attach_level(JEventLevel current_level, JUnfoldArrow* par bool need_unfold = have_unfolder; if (need_unfold) { unfold_arrow = new JUnfoldArrow(level_str+"Unfold", unfolders_at_level[0]); - unfold_arrow->attach(pool_at_level, JUnfoldArrow::REJECTED_PARENT_OUT); + unfold_arrow->GetPort(JUnfoldArrow::REJECTED_PARENT_OUT).Attach(pool_at_level); arrows.push_back(unfold_arrow); } @@ -357,20 +458,20 @@ void JTopologyBuilder::attach_level(JEventLevel current_level, JUnfoldArrow* par if(need_fold) { fold_arrow = new JFoldArrow(level_str+"Fold", current_level, unfolders_at_level[0]->GetChildLevel()); arrows.push_back(fold_arrow); - fold_arrow->attach(pool_at_level, JFoldArrow::PARENT_OUT); + fold_arrow->GetPort(JFoldArrow::PARENT_OUT).Attach(pool_at_level); } // -------------------------- // 5. Map2 // -------------------------- - JEventMapArrow* map2_arrow = nullptr; + JMapArrow* map2_arrow = nullptr; bool need_map2 = !mappable_procs_at_level.empty(); if (need_map2) { - map2_arrow = new JEventMapArrow(level_str+"Map2"); + map2_arrow = new JMapArrow(level_str+"Map2", current_level); for (JEventProcessor* proc : mappable_procs_at_level) { - map2_arrow->add_processor(proc); - map2_arrow->attach(pool_at_level, JEventMapArrow::EVENT_IN); - map2_arrow->attach(pool_at_level, JEventMapArrow::EVENT_OUT); + map2_arrow->AddProcessor(proc); + map2_arrow->GetPort(JMapArrow::EVENT_IN).Attach(pool_at_level); + map2_arrow->GetPort(JMapArrow::EVENT_OUT).Attach(pool_at_level); } arrows.push_back(map2_arrow); } @@ -378,13 +479,13 @@ void JTopologyBuilder::attach_level(JEventLevel current_level, JUnfoldArrow* par // -------------------------- // 6. Tap // -------------------------- - JEventTapArrow* first_tap_arrow = nullptr; - JEventTapArrow* last_tap_arrow = nullptr; + JTapArrow* first_tap_arrow = nullptr; + JTapArrow* last_tap_arrow = nullptr; bool need_tap = !tappable_procs_at_level.empty(); if (need_tap) { - std::tie(first_tap_arrow, last_tap_arrow) = create_tap_chain(tappable_procs_at_level, level_str); - first_tap_arrow->attach(pool_at_level, JEventTapArrow::EVENT_IN); - last_tap_arrow->attach(pool_at_level, JEventTapArrow::EVENT_OUT); + std::tie(first_tap_arrow, last_tap_arrow) = CreateTapChain(tappable_procs_at_level, level_str); + first_tap_arrow->GetPort(JTapArrow::EVENT_IN).Attach(pool_at_level); + last_tap_arrow->GetPort(JTapArrow::EVENT_OUT).Attach(pool_at_level); } @@ -393,57 +494,57 @@ void JTopologyBuilder::attach_level(JEventLevel current_level, JUnfoldArrow* par // 1. Source // -------------------------- if (parent_unfolder != nullptr) { - parent_unfolder->attach(pool_at_level, JUnfoldArrow::CHILD_IN); - connect_to_first_available(parent_unfolder, JUnfoldArrow::CHILD_OUT, - {{map1_arrow, JEventMapArrow::EVENT_IN}, {unfold_arrow, JUnfoldArrow::PARENT_IN}, {map2_arrow, JEventMapArrow::EVENT_IN}, {first_tap_arrow, JEventTapArrow::EVENT_IN}, {parent_folder, JFoldArrow::CHILD_IN}}); + parent_unfolder->GetPort(JUnfoldArrow::CHILD_IN).Attach(pool_at_level); + ConnectToFirstAvailable(parent_unfolder, JUnfoldArrow::CHILD_OUT, + {{map1_arrow, JMapArrow::EVENT_IN}, {unfold_arrow, JUnfoldArrow::PARENT_IN}, {map2_arrow, JMapArrow::EVENT_IN}, {first_tap_arrow, JTapArrow::EVENT_IN}, {parent_folder, JFoldArrow::CHILD_IN}}); } if (src_arrow != nullptr) { - connect_to_first_available(src_arrow, JEventSourceArrow::EVENT_OUT, - {{map1_arrow, JEventMapArrow::EVENT_IN}, {unfold_arrow, JUnfoldArrow::PARENT_IN}, {map2_arrow, JEventMapArrow::EVENT_IN}, {first_tap_arrow, JEventTapArrow::EVENT_IN}, {parent_folder, JFoldArrow::CHILD_IN}}); + ConnectToFirstAvailable(src_arrow, JSourceArrow::EVENT_OUT, + {{map1_arrow, JMapArrow::EVENT_IN}, {unfold_arrow, JUnfoldArrow::PARENT_IN}, {map2_arrow, JMapArrow::EVENT_IN}, {first_tap_arrow, JTapArrow::EVENT_IN}, {parent_folder, JFoldArrow::CHILD_IN}}); } if (map1_arrow != nullptr) { - connect_to_first_available(map1_arrow, JEventMapArrow::EVENT_OUT, - {{unfold_arrow, JUnfoldArrow::PARENT_IN}, {map2_arrow, JEventMapArrow::EVENT_IN}, {first_tap_arrow, JEventTapArrow::EVENT_IN}, {parent_folder, JFoldArrow::CHILD_IN}}); + ConnectToFirstAvailable(map1_arrow, JMapArrow::EVENT_OUT, + {{unfold_arrow, JUnfoldArrow::PARENT_IN}, {map2_arrow, JMapArrow::EVENT_IN}, {first_tap_arrow, JTapArrow::EVENT_IN}, {parent_folder, JFoldArrow::CHILD_IN}}); } if (unfold_arrow != nullptr) { - connect_to_first_available(unfold_arrow, JUnfoldArrow::REJECTED_PARENT_OUT, - {{map2_arrow, JEventMapArrow::EVENT_IN}, {first_tap_arrow, JEventTapArrow::EVENT_IN}, {parent_folder, JFoldArrow::CHILD_IN}}); + ConnectToFirstAvailable(unfold_arrow, JUnfoldArrow::REJECTED_PARENT_OUT, + {{map2_arrow, JMapArrow::EVENT_IN}, {first_tap_arrow, JTapArrow::EVENT_IN}, {parent_folder, JFoldArrow::CHILD_IN}}); } if (fold_arrow != nullptr) { - connect_to_first_available(fold_arrow, JFoldArrow::CHILD_OUT, - {{map2_arrow, JEventMapArrow::EVENT_IN}, {first_tap_arrow, JEventTapArrow::EVENT_IN}, {parent_folder, JFoldArrow::CHILD_IN}}); + ConnectToFirstAvailable(fold_arrow, JFoldArrow::PARENT_OUT, + {{map2_arrow, JMapArrow::EVENT_IN}, {first_tap_arrow, JTapArrow::EVENT_IN}, {parent_folder, JFoldArrow::CHILD_IN}}); } if (map2_arrow != nullptr) { - connect_to_first_available(map2_arrow, JEventMapArrow::EVENT_OUT, - {{first_tap_arrow, JEventTapArrow::EVENT_IN}, {parent_folder, JFoldArrow::CHILD_IN}}); + ConnectToFirstAvailable(map2_arrow, JMapArrow::EVENT_OUT, + {{first_tap_arrow, JTapArrow::EVENT_IN}, {parent_folder, JFoldArrow::CHILD_IN}}); } if (last_tap_arrow != nullptr) { - connect_to_first_available(last_tap_arrow, JEventTapArrow::EVENT_OUT, + ConnectToFirstAvailable(last_tap_arrow, JTapArrow::EVENT_OUT, {{parent_folder, JFoldArrow::CHILD_IN}}); } if (parent_folder != nullptr) { - parent_folder->attach(pool_at_level, JFoldArrow::CHILD_OUT); + parent_folder->GetPort(JFoldArrow::CHILD_OUT).Attach(pool_at_level); } // Finally, we recur over lower levels! if (need_unfold) { auto next_level = unfolders_at_level[0]->GetChildLevel(); - attach_level(next_level, unfold_arrow, fold_arrow); + AttachLevel(next_level, unfold_arrow, fold_arrow); } else { // This is the lowest level // TODO: Improve logic for determining event counts for multilevel topologies if (last_tap_arrow != nullptr) { - last_tap_arrow->set_is_sink(true); + last_tap_arrow->SetIsSink(true); } else if (map2_arrow != nullptr) { - map2_arrow->set_is_sink(true); + map2_arrow->SetIsSink(true); } else if (map1_arrow != nullptr) { - map1_arrow->set_is_sink(true); + map1_arrow->SetIsSink(true); } else if (src_arrow != nullptr) { - src_arrow->set_is_sink(true); + src_arrow->SetIsSink(true); } } } diff --git a/src/libraries/JANA/Topology/JTopologyBuilder.h b/src/libraries/JANA/Topology/JTopologyBuilder.h index dd22f2225..744d2ec72 100644 --- a/src/libraries/JANA/Topology/JTopologyBuilder.h +++ b/src/libraries/JANA/Topology/JTopologyBuilder.h @@ -5,13 +5,14 @@ #pragma once #include -#include -#include -#include -#include +#include #include #include +#include +#include +#include +#include class JParameterManager; @@ -19,10 +20,9 @@ class JComponentManager; class JArrow; class JFoldArrow; class JUnfoldArrow; -class JEventTapArrow; +class JTapArrow; class JTopologyBuilder : public JService { -public: // Services Service m_params {this}; std::shared_ptr m_components; @@ -31,16 +31,18 @@ class JTopologyBuilder : public JService { std::vector arrows; std::vector queues; // Queues shared between arrows std::vector pools; // Pools shared between arrows - + + std::map arrow_lookup; + std::map pool_lookup; + // Topology configuration - size_t m_max_inflight_events = 1; + std::map m_max_inflight_events; size_t m_location_count = 1; - bool m_enable_stealing = false; + //bool m_enable_stealing = false; int m_affinity = 0; int m_locality = 0; - // Things that probably shouldn't be here - std::function m_configure_topology; + std::function m_configure_topology; JProcessorMapping mapping; public: @@ -50,22 +52,35 @@ class JTopologyBuilder : public JService { void Init() override; - /// set_cofigure_fn lets the user provide a lambda that sets up a topology after all components have been loaded. - /// It provides an 'empty' JArrowTopology which has been furnished with a pointer to the JComponentManager, the JEventPool, - /// and the JProcessorMapping (in case you care about NUMA details). However, it does not contain any queues or arrows. - /// You have to furnish those yourself. - void set_configure_fn(std::function configure_fn); + void AddArrow(JArrow* arrow); + + void ConnectQueue(std::string upstream_arrow_name, std::string upstream_port_name, + std::string downstream_arrow_name, std::string downstream_port_name); + + void ConnectPool(std::string arrow_name, std::string port_name, JEventLevel level); + + void ConnectPool(JEventLevel upstream_level, JEventLevel downstream_level); + - void create_topology(); + /// SetConfigureFn() lets the user manually set up a topology after all components have been loaded. + /// This is meant to be used with AddArrow(), ConnectPool(), and ConnectQueue(). + void SetConfigureFn(std::function configure_fn); - void attach_level(JEventLevel current_level, JUnfoldArrow* parent_unfolder, JFoldArrow* parent_folder); - void connect_to_first_available(JArrow* upstream, size_t upstream_port_id, std::vector> downstreams); - void connect(JArrow* upstream, size_t upstream_port_id, JArrow* downstream, size_t downstream_port_id); - std::pair create_tap_chain(std::vector& procs, std::string name); + void CreateTopology(); - std::string print_topology(); + std::string PrintTopology(); + const std::vector& GetArrows() { return arrows; }; + const std::vector& GetPools() { return pools; }; + const std::vector& GetQueues() { return queues; }; + const JProcessorMapping& GetProcessorMapping() { return mapping; }; +private: + void AttachLevel(JEventLevel current_level, JUnfoldArrow* parent_unfolder, JFoldArrow* parent_folder); + void ConnectToFirstAvailable(JArrow* upstream, size_t upstream_port_id, std::vector> downstreams); + void Connect(JArrow* upstream, size_t upstream_port_id, JArrow* downstream, size_t downstream_port_id); + std::pair CreateTapChain(std::vector& procs, std::string name); + JEventPool* GetOrCreatePool(JEventLevel level); }; diff --git a/src/libraries/JANA/Topology/JUnfoldArrow.h b/src/libraries/JANA/Topology/JUnfoldArrow.h index 173d0d6e8..7d69b4c6d 100644 --- a/src/libraries/JANA/Topology/JUnfoldArrow.h +++ b/src/libraries/JANA/Topology/JUnfoldArrow.h @@ -17,23 +17,30 @@ class JUnfoldArrow : public JArrow { public: JUnfoldArrow(std::string name, JEventUnfolder* unfolder) : m_unfolder(unfolder) { - set_name(name); - create_ports(2, 2); - m_next_input_port = PARENT_IN; - m_ports[CHILD_OUT].establishes_ordering=true; // Just in case there's a folder that needs this + SetName(name); + auto parent_level = unfolder->GetLevel(); + auto child_level = unfolder->GetChildLevel(); + AddPort("parent_in", parent_level); + AddPort("child_in", child_level); + AddPort("child_out", child_level).SetEstablishesOrdering(true); + // Just in case there's a folder that needs this. + // establishes_ordering is cheap; enforces_ordering is the expensive one + + AddPort("rejected_parent_out", parent_level); + m_next_input_port = GetPortIndex("parent_in"); } - void initialize() final { + void Initialize() final { m_unfolder->DoInit(); LOG_INFO(m_logger) << "Initialized JEventUnfolder '" << m_unfolder->GetTypeName() << "'" << LOG_END; } - void finalize() final { + void Finalize() final { m_unfolder->DoFinish(); LOG_INFO(m_logger) << "Finalized JEventUnfolder '" << m_unfolder->GetTypeName() << "'" << LOG_END; } - void fire(JEvent* event, OutputData& outputs, size_t& output_count, JArrow::FireResult& status) final { + void Fire(JEvent* event, OutputData& outputs, size_t& output_count, JArrow::FireResult& status) final { // Take whatever we were given if (this->m_next_input_port == PARENT_IN) { diff --git a/src/libraries/JANA/Utils/JApplicationInspector.cc b/src/libraries/JANA/Utils/JApplicationInspector.cc index 368a0f516..d30a62fcd 100644 --- a/src/libraries/JANA/Utils/JApplicationInspector.cc +++ b/src/libraries/JANA/Utils/JApplicationInspector.cc @@ -27,7 +27,7 @@ void PrintMenu() { void InspectTopology(JApplication* app) { auto topology = app->GetService(); - std::cout << topology->print_topology() << std::endl; + std::cout << topology->PrintTopology() << std::endl; } void Fire(JApplication* app, int arrow_id) { diff --git a/src/programs/CMakeLists.txt b/src/programs/CMakeLists.txt new file mode 100644 index 000000000..759fa92e6 --- /dev/null +++ b/src/programs/CMakeLists.txt @@ -0,0 +1,10 @@ + +add_subdirectory(jana) + +if (${BUILD_TESTS}) + add_subdirectory(unit_tests) + add_subdirectory(integration_tests) + add_subdirectory(perf_tests) +endif() + + diff --git a/src/programs/integration_tests/BatchedArrow.cc b/src/programs/integration_tests/BatchedArrow.cc new file mode 100644 index 000000000..14dbb5873 --- /dev/null +++ b/src/programs/integration_tests/BatchedArrow.cc @@ -0,0 +1,141 @@ +#include + +#include +#include +#include +#include +#include +#include + +class BatchedArrow : public JArrow { + + size_t m_batch_size = 5; + std::deque m_batched_events; + +public: + BatchedArrow(JEventLevel level) { + SetName("BatchedArrow"); + SetIsParallel(false); + AddPort("in", level); + AddPort("out", level); + } + + void SetBatchSize(int batch_size) { m_batch_size = batch_size; } + + virtual void Batch(const JEvent& evt) { + GetLogger() << "Batching event " << evt.GetEventNumber(); + } + + virtual void Process() { + GetLogger() << "Processing batch containing:"; + for (auto* evt : m_batched_events) { + GetLogger() << " " << evt->GetEventNumber(); + } + } + + virtual void Unbatch(const JEvent& evt) { + GetLogger() << "Unbatching event " << evt.GetEventNumber(); + } + + void Fire(JEvent* event, OutputData& outputs, size_t& output_count, JArrow::FireResult& status) override { + + bool releasing_batch = (event == nullptr); + + if (event != nullptr) { + // In this case, we are _filling_ the batch, and processing it once it gets filled + // Populate inputs + Batch(*event); + + m_batched_events.push_back(event); + + if (m_batched_events.size() == m_batch_size) { + releasing_batch = true; + Process(); + for (JEvent* event : m_batched_events) { + Unbatch(*event); + // Publish outputs + } + } + } + if (!releasing_batch) { + GetLogger() << "NOT releasing batch yet"; + m_next_input_port = 0; + output_count = 0; + status = JArrow::FireResult::KeepGoing; + } + else { + for (int i=0; i<2 && !m_batched_events.empty(); ++i) { + JEvent* event = m_batched_events.front(); + m_batched_events.pop_front(); + GetLogger() << "Releasing event " << event->GetEventNumber(); + outputs[i] = {event, 1}; + output_count = i+1; + status = JArrow::FireResult::KeepGoing; + } + + if (m_batched_events.empty()) { + m_next_input_port = 0; + } + else { + m_next_input_port = -1; + } + } + } +}; + +struct BatchedProc : public JEventProcessor { + std::vector observed_event_numbers; + BatchedProc() { + SetCallbackStyle(JFactory::CallbackStyle::ExpertMode); + } + void ProcessSequential(const JEvent& event) override { + LOG_INFO(GetLogger()) << "JEP found event" << event.GetEventNumber(); + observed_event_numbers.push_back(event.GetEventNumber()); + } + void Finish() override { + LOG_INFO(GetLogger()) << "BatchedProc observed event numbers"; + for (int x: observed_event_numbers) { + LOG_INFO(GetLogger()) << " " << x; + } + } +}; + + +void configure_batched_topology(JTopologyBuilder& builder, JComponentManager& component_manager) { + + auto* src_arrow = new JSourceArrow("PhysicsEventSource", JEventLevel::PhysicsEvent, component_manager.get_evt_srces()); + + BatchedArrow* batched_arrow = new BatchedArrow(JEventLevel::PhysicsEvent); + + JTapArrow* tap_arrow = new JTapArrow("PhysicsEventTap", JEventLevel::PhysicsEvent); + for (auto proc : component_manager.get_evt_procs()) { + tap_arrow->AddProcessor(proc); + } + + builder.AddArrow(src_arrow); + builder.AddArrow(batched_arrow); + builder.AddArrow(tap_arrow); + builder.ConnectPool("PhysicsEventSource", "in", JEventLevel::PhysicsEvent); + builder.ConnectQueue("PhysicsEventSource", "out", "BatchedArrow", "in"); + builder.ConnectQueue("BatchedArrow", "out", "PhysicsEventTap", "in"); + builder.ConnectPool("PhysicsEventTap", "out", JEventLevel::PhysicsEvent); +} + + + +TEST_CASE("BatchedArrow") { + JApplication app; + app.Add(new JEventSource); + app.Add(new BatchedProc); + app.SetParameterValue("jana:nevents", 49); + app.SetParameterValue("nthreads", 1); + app.SetParameterValue("jana:max_inflight_events", 20); + app.SetParameterValue("jana:log:show_threadstamp", 1); + app.SetParameterValue("jana:loglevel", "TRACE"); + + app.GetService()->SetConfigureFn(configure_batched_topology); + app.Run(); +} + + + diff --git a/src/programs/integration_tests/CMakeLists.txt b/src/programs/integration_tests/CMakeLists.txt new file mode 100644 index 000000000..acba6b103 --- /dev/null +++ b/src/programs/integration_tests/CMakeLists.txt @@ -0,0 +1,10 @@ + + +set(JANA2_INTEGRATION_TEST_SOURCES + SimpleOffloading.cc + BatchedArrow.cc +) + +add_jana_test(jana-integration-tests SOURCES ${JANA2_INTEGRATION_TEST_SOURCES}) + + diff --git a/src/programs/integration_tests/SimpleOffloading.cc b/src/programs/integration_tests/SimpleOffloading.cc new file mode 100644 index 000000000..e27ae0fa7 --- /dev/null +++ b/src/programs/integration_tests/SimpleOffloading.cc @@ -0,0 +1,173 @@ + +#define CATCH_CONFIG_MAIN +#include +#include +#include +#include +#include +#include +#include +#include +#include + +// This integration test covers the end-to-end testing of a GPU override +// We set this up so that we have the following factory chain: +// A (cpu) -> B (gpu) -> C (cpu) + +struct A {int a; }; +struct B {int b; }; +struct C {int c; }; + +struct AFac : public JFactory { + Output a_out {this}; + void Process(const JEvent& event) override { + LOG_INFO(GetLogger()) << "Running AFac (hopefully on CPU)" << LOG_END; + A* a = new A; + a->a = event.GetEventNumber() + 1; + a_out().push_back(a); + } +}; + +struct BFac : public JFactory { + Input a_in {this}; + Output b_out {this}; + void Process(const JEvent&) override { + LOG_INFO(GetLogger()) << "Running BFac (hopefully on GPU)" << LOG_END; + auto* a = a_in->at(0); + B* b = new B; + b->b = a->a * 2; + b_out().push_back(b); + } +}; + +struct CFac : public JFactory { + Input b_in {this}; + Output c_out {this}; + void Process(const JEvent&) override { + LOG_INFO(GetLogger()) << "Running CFac (hopefully on CPU)" << LOG_END; + auto* b = b_in->at(0); + C* c = new C; + c->c = b->b + 4; + c_out().push_back(c); + } +}; + +struct Proc : public JEventProcessor { + Input c_in {this}; + Proc() { + SetCallbackStyle(JFactory::CallbackStyle::ExpertMode); + } + void ProcessSequential(const JEvent& event) override { + LOG_INFO(GetLogger()) << "Retrieving C (hopefully on CPU)" << LOG_END; + auto* c = c_in->at(0); + auto evtnr = event.GetEventNumber(); + int expected = ((evtnr + 1) * 2) + 4; + LOG_INFO(GetLogger()) << "Evt nr " << evtnr << ": " << "Expected " << expected << ", found " << c->c << std::endl; + REQUIRE(expected == c->c); + } +}; + + +struct TriggerFactoryInputsArrow : public JArrow { + std::string unique_name; + + TriggerFactoryInputsArrow(JEventLevel level) { + SetName("trigger"); + SetIsParallel(true); + AddPort("in", level); + AddPort("out", level); + } + + void Fire(JEvent* event, OutputData& outputs, size_t& output_count, JArrow::FireResult& status) override { + auto* fac = event->GetFactorySet()->GetDatabundle(unique_name)->GetFactory(); + for (auto* input : fac->GetInputs()) { + input->TriggerFactoryCreate(*event); + } + for (auto* input : fac->GetVariadicInputs()) { + input->TriggerFactoryCreate(*event); + } + LOG_DEBUG(m_logger) << "Executed arrow " << GetName() << " for event# " << event->GetEventNumber() << LOG_END; + outputs[0] = {event, 1}; + output_count = 1; + status = JArrow::FireResult::KeepGoing; + } +}; + +struct OffloadArrow : public JArrow { + std::string unique_name; + OffloadArrow(JEventLevel level) { + SetName("offload"); + SetIsParallel(false); + AddPort("in", level); + AddPort("out", level); + } + + ~OffloadArrow() override {} + + void Fire(JEvent* event, OutputData& outputs, size_t& output_count, JArrow::FireResult& status) override { + + event->GetFactorySet()->GetDatabundle(unique_name)->GetFactory()->Create(*event); + + LOG_DEBUG(m_logger) << "Executed arrow " << GetName() << " for event# " << event->GetEventNumber() << LOG_END; + outputs[0] = {event, 1}; + output_count = 1; + status = JArrow::FireResult::KeepGoing; + } +}; + + +void configure_topology(JTopologyBuilder& builder, JComponentManager& components) { + + auto* src_arrow = new JSourceArrow("src", JEventLevel::PhysicsEvent, components.get_evt_srces()); + + TriggerFactoryInputsArrow* trigger_inputs_arrow = new TriggerFactoryInputsArrow(JEventLevel::PhysicsEvent); + trigger_inputs_arrow->unique_name = "B"; + + OffloadArrow* offload_arrow = new OffloadArrow(JEventLevel::PhysicsEvent); + offload_arrow->unique_name = "B"; + + JMapArrow* map_arrow = new JMapArrow("map", JEventLevel::PhysicsEvent); + for (auto proc : components.get_evt_procs()) { + map_arrow->AddProcessor(proc); + } + + JTapArrow* tap_arrow = new JTapArrow("tap", JEventLevel::PhysicsEvent); + for (auto proc : components.get_evt_procs()) { + tap_arrow->AddProcessor(proc); + } + + builder.AddArrow(src_arrow); + builder.AddArrow(trigger_inputs_arrow); + builder.AddArrow(offload_arrow); + builder.AddArrow(map_arrow); + builder.AddArrow(tap_arrow); + + builder.ConnectPool("src", "in", JEventLevel::PhysicsEvent); + builder.ConnectPool("tap", "out", JEventLevel::PhysicsEvent); + + builder.ConnectQueue("src", "out", "trigger", "in"); + builder.ConnectQueue("trigger", "out", "offload", "in"); + builder.ConnectQueue("offload", "out", "map", "in"); + builder.ConnectQueue("map", "out", "tap", "in"); +} + + +TEST_CASE("SimpleOffloading") { + JApplication app; + app.Add(new JFactoryGeneratorT()); + app.Add(new JFactoryGeneratorT()); + app.Add(new JFactoryGeneratorT()); + app.Add(new JEventSource); + app.Add(new Proc); + app.SetParameterValue("jana:nevents", 3); + app.SetParameterValue("nthreads", 2); + app.SetParameterValue("jana:log:show_threadstamp", 1); + app.SetParameterValue("jana:loglevel", "DEBUG"); + + auto builder = app.GetService(); + builder->SetConfigureFn(configure_topology); + app.Run(); +} + + + diff --git a/src/programs/unit_tests/Components/UnfoldTests.cc b/src/programs/unit_tests/Components/UnfoldTests.cc index ef7736ac2..fcb94e669 100644 --- a/src/programs/unit_tests/Components/UnfoldTests.cc +++ b/src/programs/unit_tests/Components/UnfoldTests.cc @@ -69,13 +69,13 @@ TEST_CASE("UnfoldTests_Basic") { TestUnfolder unfolder; JUnfoldArrow arrow("sut", &unfolder); - arrow.attach(&parent_queue, JUnfoldArrow::PARENT_IN); - arrow.attach(&child_pool, JUnfoldArrow::CHILD_IN); - arrow.attach(&child_queue, JUnfoldArrow::CHILD_OUT); + arrow.GetPort(JUnfoldArrow::PARENT_IN).Attach(&parent_queue); + arrow.GetPort(JUnfoldArrow::CHILD_IN).Attach(&child_pool); + arrow.GetPort(JUnfoldArrow::CHILD_OUT).Attach(&child_queue); - arrow.initialize(); - arrow.execute( 0); // First call to execute() picks up the parent and exits early - auto result = arrow.execute( 0); // Second call to execute() picks up the child, calls Unfold(), and emits the newly parented child + arrow.Initialize(); + arrow.Execute( 0); // First call to execute() picks up the parent and exits early + auto result = arrow.Execute( 0); // Second call to execute() picks up the child, calls Unfold(), and emits the newly parented child REQUIRE(result == JArrow::FireResult::KeepGoing); REQUIRE(child_queue.GetSize(0) == 1); REQUIRE(unfolder.preprocessed_event_nrs.size() == 0); @@ -104,10 +104,10 @@ TEST_CASE("FoldArrowTests") { JEventQueue parent_out(5, 1); JFoldArrow arrow("sut", JEventLevel::Timeslice, JEventLevel::PhysicsEvent); - arrow.attach(&child_in, JFoldArrow::CHILD_IN); - arrow.attach(&child_out, JFoldArrow::CHILD_OUT); - arrow.attach(&parent_out, JFoldArrow::PARENT_OUT); - arrow.initialize(); + arrow.GetPort(JFoldArrow::CHILD_IN).Attach(&child_in); + arrow.GetPort(JFoldArrow::CHILD_OUT).Attach(&child_out); + arrow.GetPort(JFoldArrow::PARENT_OUT).Attach(&parent_out); + arrow.Initialize(); SECTION("One-to-one relationship between timeslices and events") { @@ -131,7 +131,7 @@ TEST_CASE("FoldArrowTests") { evt2->SetParent(ts2); child_in.Push(evt2, 0); - arrow.execute(0); + arrow.Execute(0); REQUIRE(child_in.GetSize(0) == 1); REQUIRE(child_out.GetSize(0) == 1); @@ -173,25 +173,25 @@ TEST_CASE("FoldArrowTests") { child_in.Push(evt3, 0); child_in.Push(evt4, 0); - arrow.execute(0); + arrow.Execute(0); REQUIRE(child_in.GetSize(0) == 3); REQUIRE(child_out.GetSize(0) == 1); REQUIRE(parent_out.GetSize(0) == 0); - arrow.execute(0); + arrow.Execute(0); REQUIRE(child_in.GetSize(0) == 2); REQUIRE(child_out.GetSize(0) == 2); REQUIRE(parent_out.GetSize(0) == 1); - arrow.execute(0); + arrow.Execute(0); REQUIRE(child_in.GetSize(0) == 1); REQUIRE(child_out.GetSize(0) == 3); REQUIRE(parent_out.GetSize(0) == 1); - arrow.execute(0); + arrow.Execute(0); REQUIRE(child_in.GetSize(0) == 0); REQUIRE(child_out.GetSize(0) == 4); diff --git a/src/programs/unit_tests/Engine/JExecutionEngineTests.cc b/src/programs/unit_tests/Engine/JExecutionEngineTests.cc index 851b7caef..1f0585cce 100644 --- a/src/programs/unit_tests/Engine/JExecutionEngineTests.cc +++ b/src/programs/unit_tests/Engine/JExecutionEngineTests.cc @@ -117,42 +117,42 @@ TEST_CASE("JExecutionEngine_ExternalWorkers") { sut->ExchangeTask(task, worker.worker_id); REQUIRE(task.arrow != nullptr); - REQUIRE(task.arrow->get_name() == "PhysicsEventSource"); // Only task available at this point! + REQUIRE(task.arrow->GetName() == "PhysicsEventSource"); // Only task available at this point! REQUIRE(sut->GetRunStatus() == JExecutionEngine::RunStatus::Running); REQUIRE(sut->GetPerf().event_count == 0); - task.arrow->fire(task.input_event, task.outputs, task.output_count, task.status); + task.arrow->Fire(task.input_event, task.outputs, task.output_count, task.status); REQUIRE(task.output_count == 1); REQUIRE(task.status == JArrow::FireResult::KeepGoing); sut->ExchangeTask(task, worker.worker_id); REQUIRE(task.arrow != nullptr); - REQUIRE(task.arrow->get_name() == "PhysicsEventSource"); // This will fail due to jana:nevents + REQUIRE(task.arrow->GetName() == "PhysicsEventSource"); // This will fail due to jana:nevents REQUIRE(sut->GetRunStatus() == JExecutionEngine::RunStatus::Running); REQUIRE(sut->GetPerf().event_count == 0); - task.arrow->fire(task.input_event, task.outputs, task.output_count, task.status); + task.arrow->Fire(task.input_event, task.outputs, task.output_count, task.status); REQUIRE(task.output_count == 1); REQUIRE(task.outputs[0].second == 0); // Failure => return to pool REQUIRE(task.status == JArrow::FireResult::Finished); sut->ExchangeTask(task, worker.worker_id); REQUIRE(task.arrow != nullptr); - REQUIRE(task.arrow->get_name() == "PhysicsEventMap2"); + REQUIRE(task.arrow->GetName() == "PhysicsEventMap2"); REQUIRE(sut->GetRunStatus() == JExecutionEngine::RunStatus::Draining); REQUIRE(sut->GetPerf().event_count == 0); - task.arrow->fire(task.input_event, task.outputs, task.output_count, task.status); + task.arrow->Fire(task.input_event, task.outputs, task.output_count, task.status); REQUIRE(task.output_count == 1); REQUIRE(task.status == JArrow::FireResult::KeepGoing); sut->ExchangeTask(task, worker.worker_id); REQUIRE(task.arrow != nullptr); - REQUIRE(task.arrow->get_name() == "PhysicsEventTap"); + REQUIRE(task.arrow->GetName() == "PhysicsEventTap"); REQUIRE(sut->GetRunStatus() == JExecutionEngine::RunStatus::Draining); REQUIRE(sut->GetPerf().event_count == 0); - task.arrow->fire(task.input_event, task.outputs, task.output_count, task.status); + task.arrow->Fire(task.input_event, task.outputs, task.output_count, task.status); REQUIRE(task.output_count == 1); REQUIRE(task.status == JArrow::FireResult::KeepGoing); diff --git a/src/programs/unit_tests/Topology/JArrowTests.cc b/src/programs/unit_tests/Topology/JArrowTests.cc index a433236ee..7726c5fda 100644 --- a/src/programs/unit_tests/Topology/JArrowTests.cc +++ b/src/programs/unit_tests/Topology/JArrowTests.cc @@ -9,11 +9,12 @@ struct TestData { int x; }; struct BasicParallelArrow : public JArrow { BasicParallelArrow() { - create_ports(1, 1); - set_is_parallel(true); + AddPort("in", JEventLevel::PhysicsEvent); + AddPort("out", JEventLevel::PhysicsEvent); + SetIsParallel(true); } - void fire(JEvent* input, OutputData& outputs, size_t& output_count, JArrow::FireResult& process_status) { + void Fire(JEvent* input, OutputData& outputs, size_t& output_count, JArrow::FireResult& process_status) override { input->Insert(new TestData {.x=22}); @@ -36,7 +37,7 @@ TEST_CASE("BasicParallelArrow_Fire") { size_t output_count; JArrow::FireResult status; - sut.fire(event.get(), outputs, output_count, status); + sut.Fire(event.get(), outputs, output_count, status); REQUIRE(event->GetSingle()->x == 22); REQUIRE(output_count == 1); @@ -56,10 +57,10 @@ TEST_CASE("BasicParallelArrow_ExecuteSucceeds") { JEventPool input_pool(app.GetService(), 10, 1); JEventQueue output_queue(10, 1); - sut.attach(&input_pool, 0); - sut.attach(&output_queue, 1); + sut.GetPort(0).Attach(&input_pool); + sut.GetPort(1).Attach(&output_queue); - auto result = sut.execute( 0); + auto result = sut.Execute( 0); REQUIRE(result == JArrow::FireResult::KeepGoing); REQUIRE(output_queue.GetSize(0) == 1); @@ -77,10 +78,10 @@ TEST_CASE("BasicParallelArrow_ExecuteFails") { JEventQueue input_queue(10, 1); JEventQueue output_queue(10, 1); - sut.attach(&input_queue, 0); - sut.attach(&output_queue, 1); + sut.GetPort(0).Attach(&input_queue); + sut.GetPort(1).Attach(&output_queue); - auto result = sut.execute(0); + auto result = sut.Execute(0); REQUIRE(result == JArrow::FireResult::NotRunYet); REQUIRE(output_queue.GetSize(0) == 0); diff --git a/src/programs/unit_tests/Topology/JTopologyBuilderTests.cc b/src/programs/unit_tests/Topology/JTopologyBuilderTests.cc index 0659a0e9c..20534bb2c 100644 --- a/src/programs/unit_tests/Topology/JTopologyBuilderTests.cc +++ b/src/programs/unit_tests/Topology/JTopologyBuilderTests.cc @@ -1,7 +1,7 @@ #include "JANA/JApplicationFwd.h" -#include "JANA/Topology/JEventPool.h" -#include "JANA/Topology/JEventTapArrow.h" +#include "JANA/Services/JComponentManager.h" +#include "JANA/Topology/JTapArrow.h" #include "JANA/Utils/JEventLevel.h" #include @@ -36,43 +36,29 @@ class MyMultiSource : public JEventSource { } }; -void configure_multisource_topology(JTopologyBuilder& builder) { - - auto run_pool = new JEventPool(builder.m_components, 1, 1, JEventLevel::Run); - auto controls_pool = new JEventPool(builder.m_components, 2, 1, JEventLevel::SlowControls); - auto physics_pool = new JEventPool(builder.m_components, 4, 1, JEventLevel::PhysicsEvent); - - builder.pools.push_back(run_pool); - builder.pools.push_back(controls_pool); - builder.pools.push_back(physics_pool); - - physics_pool->AttachForwardingPool(controls_pool); - physics_pool->AttachForwardingPool(run_pool); +void configure_multisource_topology(JTopologyBuilder& builder, JComponentManager& components) { auto* src_arrow = new JMultilevelSourceArrow; - src_arrow->set_name("MultilevelSource"); - src_arrow->SetEventSource(builder.m_components->get_evt_srces().at(0)); - src_arrow->attach(run_pool, src_arrow->GetPortIndex(JEventLevel::Run, JMultilevelSourceArrow::Direction::In)); - src_arrow->attach(controls_pool, src_arrow->GetPortIndex(JEventLevel::SlowControls, JMultilevelSourceArrow::Direction::In)); - src_arrow->attach(physics_pool, src_arrow->GetPortIndex(JEventLevel::PhysicsEvent, JMultilevelSourceArrow::Direction::In)); - - src_arrow->attach(run_pool, src_arrow->GetPortIndex(JEventLevel::Run, JMultilevelSourceArrow::Direction::Out)); - src_arrow->attach(controls_pool, src_arrow->GetPortIndex(JEventLevel::SlowControls, JMultilevelSourceArrow::Direction::Out)); - - JEventTapArrow* tap_arrow = new JEventTapArrow("DeinterleavedTap"); - for (auto proc : builder.m_components->get_evt_procs()) { - tap_arrow->add_processor(proc); - } + src_arrow->SetName("src"); + src_arrow->SetEventSource(components.get_evt_srces().at(0)); - builder.connect(src_arrow, src_arrow->GetPortIndex(JEventLevel::PhysicsEvent, JMultilevelSourceArrow::Direction::Out), - tap_arrow, tap_arrow->EVENT_IN); - - builder.queues.at(0)->Scale(4); // Queue capacity = N(PhysicsEvent) - - tap_arrow->attach(physics_pool, tap_arrow->EVENT_OUT); + JTapArrow* tap_arrow = new JTapArrow("tap"); + for (auto proc : components.get_evt_procs()) { + tap_arrow->AddProcessor(proc); + } - builder.arrows.push_back(src_arrow); - builder.arrows.push_back(tap_arrow); + builder.AddArrow(src_arrow); + builder.AddArrow(tap_arrow); + + builder.ConnectPool("src", "RunIn", JEventLevel::Run); + builder.ConnectPool("src", "SlowControlsIn", JEventLevel::SlowControls); + builder.ConnectPool("src", "PhysicsEventIn", JEventLevel::PhysicsEvent); + builder.ConnectPool("src", "RunOut", JEventLevel::Run); + builder.ConnectPool("src", "SlowControlsOut", JEventLevel::SlowControls); + builder.ConnectQueue("src", "PhysicsEventOut", "tap", "in"); + builder.ConnectPool("tap", "out", JEventLevel::PhysicsEvent); + builder.ConnectPool(JEventLevel::PhysicsEvent, JEventLevel::Run); + builder.ConnectPool(JEventLevel::PhysicsEvent, JEventLevel::SlowControls); } @@ -84,7 +70,7 @@ TEST_CASE("MultilevelSourceCustomTopology") { app.Add(new MyMultiSource); app.Add(new DeinterleavedProc); auto builder = app.GetService(); - builder->set_configure_fn(configure_multisource_topology); + builder->SetConfigureFn(configure_multisource_topology); app.Run(); } diff --git a/src/programs/unit_tests/Topology/MultiLevelTopologyTests.cc b/src/programs/unit_tests/Topology/MultiLevelTopologyTests.cc index 2666c1512..d5f934399 100644 --- a/src/programs/unit_tests/Topology/MultiLevelTopologyTests.cc +++ b/src/programs/unit_tests/Topology/MultiLevelTopologyTests.cc @@ -7,7 +7,7 @@ #include "JANA/Utils/JEventLevel.h" #include -#include + namespace jana { namespace timeslice_tests { @@ -18,6 +18,7 @@ TEST_CASE("TimeslicesTests_FineGrained") { JApplication app; app.SetParameterValue("jana:loglevel", "trace"); app.SetParameterValue("jana:nevents", "5"); + app.SetParameterValue("jana:max_inflight_timeslices", "2"); app.SetParameterValue("jana:max_inflight_events", "4"); app.Add(new MyTimesliceSource); @@ -36,15 +37,15 @@ TEST_CASE("TimeslicesTests_FineGrained") { result = ee->Fire(TS_SRC, 0); REQUIRE(result == JArrow::FireResult::KeepGoing); - REQUIRE(top->arrows[TS_SRC]->get_port(1).queue == top->queues[0]); - REQUIRE(top->pools[0]->GetCapacity() == 4); - REQUIRE(top->pools[0]->GetSize(0) == 3); - REQUIRE(top->queues[0]->GetSize(0) == 1); + REQUIRE(top->GetArrows()[TS_SRC]->GetPort(1).GetQueue() == top->GetQueues()[0]); + REQUIRE(top->GetPools()[0]->GetCapacity() == 2); + REQUIRE(top->GetPools()[0]->GetSize(0) == 1); + REQUIRE(top->GetQueues()[0]->GetSize(0) == 1); result = ee->Fire(TS_MAP, 0); REQUIRE(result == JArrow::FireResult::KeepGoing); - REQUIRE(top->queues[0]->GetSize(0) == 0); - REQUIRE(top->queues[1]->GetSize(0) == 1); + REQUIRE(top->GetQueues()[0]->GetSize(0) == 0); + REQUIRE(top->GetQueues()[1]->GetSize(0) == 1); // Parent result = ee->Fire(TS_UNF, 0); @@ -63,8 +64,8 @@ TEST_CASE("TimeslicesTests_FineGrained") { result = ee->Fire(TS_FLD, 0); REQUIRE(result == JArrow::FireResult::KeepGoing); - REQUIRE(top->pools[0]->GetSize(0) == 3); // Unfolder still has parent - REQUIRE(top->pools[1]->GetSize(0) == 4); // Child returned to pool + REQUIRE(top->GetPools()[0]->GetSize(0) == 1); // Unfolder still has parent + REQUIRE(top->GetPools()[1]->GetSize(0) == 4); // Child returned to pool } diff --git a/src/programs/unit_tests/Topology/SubeventTests.cc b/src/programs/unit_tests/Topology/SubeventTests.cc deleted file mode 100644 index 6a654314b..000000000 --- a/src/programs/unit_tests/Topology/SubeventTests.cc +++ /dev/null @@ -1,132 +0,0 @@ - -// Copyright 2022, Jefferson Science Associates, LLC. -// Subject to the terms in the LICENSE file found in the top-level directory. - - -#include - -#include -#include -#include -#include -#include -#include -#include - - -struct MyInput : public JObject { - int x; - float y; - MyInput(int x, float y) : x(x), y(y) {} -}; - -struct MyOutput : public JObject { - float z; - explicit MyOutput(float z) : z(z) {} -}; - -struct MyProcessor : public JSubeventProcessor { - MyProcessor() { - inputTag = ""; - outputTag = "subeventted"; - } - MyOutput* ProcessSubevent(MyInput* input) override { - return new MyOutput(input->y + (float) input->x); - } -}; - -TEST_CASE("Create subevent processor") { - - MyProcessor processor; - MyInput input(22, 7.6); - MyOutput* output = processor.ProcessSubevent(&input); - REQUIRE(output->z == 29.6f); -} - - -TEST_CASE("Basic subevent arrow functionality") { - - MyProcessor processor; - JMailbox events_in; - JMailbox events_out; - JMailbox> subevents_in; - JMailbox> subevents_out; - - auto split_arrow = new JSplitArrow("split", &processor, &events_in, &subevents_in); - auto subprocess_arrow = new JSubeventArrow("subprocess", &processor, &subevents_in, &subevents_out); - auto merge_arrow = new JMergeArrow("merge", &processor, &subevents_out, &events_out); - - SECTION("No-op execute subevent arrows") { - JArrowMetrics m; - split_arrow->execute(m, 0); - merge_arrow->execute(m, 0); - subprocess_arrow->execute(m, 0); - } - - struct SimpleSource : public JEventSource { - SimpleSource() { - SetCallbackStyle(CallbackStyle::ExpertMode); - } - Result Emit(JEvent& event) override { - if (GetEmittedEventCount() == 10) return Result::FailureFinished; - std::vector inputs; - inputs.push_back(new MyInput(22,3.6)); - inputs.push_back(new MyInput(23,3.5)); - inputs.push_back(new MyInput(24,3.4)); - inputs.push_back(new MyInput(25,3.3)); - event.Insert(inputs); - return Result::Success; - } - }; - - struct SimpleProcessor : public JEventProcessor { - SimpleProcessor() { - SetCallbackStyle(CallbackStyle::ExpertMode); - } - void Process(const JEvent& event) { - auto outputs = event.Get(); - REQUIRE(outputs.size() == 4); - REQUIRE(outputs[0]->z == 25.6f); - REQUIRE(outputs[1]->z == 26.5f); - REQUIRE(outputs[2]->z == 27.4f); - REQUIRE(outputs[3]->z == 28.3f); - } - }; - - SECTION("Execute subevent arrows end-to-end") { - - JApplication app; - app.SetTimeoutEnabled(false); - app.SetTicker(false); - - auto topology = app.GetService(); - topology->set_configure_fn([&](JTopologyBuilder& topology) { - - auto source_arrow = new JEventSourceArrow("simpleSource", {new SimpleSource}); - source_arrow->attach(topology.event_pool, JEventSourceArrow::EVENT_IN); - source_arrow->attach(&events_in, JEventSourceArrow::EVENT_OUT); - - auto proc_arrow = new JEventMapArrow("simpleProcessor"); - proc_arrow->attach(&events_out, 0); - proc_arrow->attach(topology.event_pool, 1); - proc_arrow->add_processor(new SimpleProcessor); - - topology.arrows.push_back(source_arrow); - topology.arrows.push_back(split_arrow); - topology.arrows.push_back(subprocess_arrow); - topology.arrows.push_back(merge_arrow); - topology.arrows.push_back(proc_arrow); - - source_arrow->attach(split_arrow); - split_arrow->attach(subprocess_arrow); - subprocess_arrow->attach(merge_arrow); - merge_arrow->attach(proc_arrow); - }); - - app.Run(true); - } - - -} - -