Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 1 addition & 6 deletions CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -220,18 +220,13 @@ include(cmake/AddJanaTest.cmake)
add_subdirectory(src/external)
add_subdirectory(src/libraries/JANA)
add_subdirectory(src/plugins)
add_subdirectory(src/programs/jana)
add_subdirectory(src/programs)
add_subdirectory(src/python)

if (${BUILD_EXAMPLES})
add_subdirectory(src/examples)
endif()

if (${BUILD_TESTS})
add_subdirectory(src/programs/unit_tests)
add_subdirectory(src/programs/perf_tests)
endif()


#---------------------------------------------------------------------------------------

Expand Down
72 changes: 72 additions & 0 deletions docs/offloading.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,72 @@




Work package: GPU arrows
------------------------

Problem: Arrows can only have _one_ next_input at any given time
Solution:
- GPU tasks can all be pushed by GPUArrow to _one_ queue per physical GPU

Design:
- GPUMapArrow
- Knows which factory needs the GPU
- Knows which factories need to run upstream
- Knows how to pack and unpack the offloaded data

- GPUTapArrow
- Knows what to do with the packed input data. How does it know this?
- Doesn't worry about pipelining the memory movement for now

- GPUFactory
- I'm hoping we can reuse Inputs and Outputs for pack() and unpack(), because
they need to be in the JEvent _somehow_


Problem: GPUTapArrow doesn't know what to do! Needs some kind of indication of
- What factory needs to be run on this thing?
- What arrow (what queue, technically) does the JEvent need to be returned to?

One of the reasons this is a problem is because JEvent databundles are supposed to
be immutable, so this cannot be a factory output. Probably has to be a separate
field on the JEvent. Who sets this field, though? And via what interface?

Note that the GPUMapArrow knows the factory and output queue (which MIGHT just be
its own input queue. Is this always the case?). Maybe GPUMapArrow should set this.

So what should this be? (databundle_name, next_input_queue).
Quickly check whether next_input_queue is a arrow-local port index or a topology-wide queue id.
- Sadly it is an arrow-local port index
- The queue doesn't know it's own id
- So it's the responsibility of the TopologyBuilder to inform the GPUTapArrow what the output queue index is
- OR we only store the factoryname, and the GPUTapArrow looks up the correct port index for the given factoryname

Problem:
I'm not sure if the concept of a continuation name is general enough to stick onto JEvent. Let's
think about the calibration workflow a little more. That might require re-running factories that have already run,
which poses a problem for immutability, but which we might be able to solve via event levels.


Suppose I unroll the loop and avoid having continuations. What a relief! Now each GPUTapArrow knows exactly what to do.
However, now I need to do a lot more in my JTopologyBuilder (to figure out this unroll) and my scheduler needs to enforce
mutual exclusion _across_ arrows. Both of these are very unpleasant to implement. I might need them eventually. But not today.


Resuming this!
Goal: Check whether I call Preprocess on a JFactory















Original file line number Diff line number Diff line change
Expand Up @@ -144,7 +144,7 @@ int main() {
source_arrow->set_input(topology->event_pool);
source_arrow->set_output(&events_in);

auto proc_arrow = new JEventMapArrow("simpleProcessor");
auto proc_arrow = new JMapArrow("simpleProcessor");
proc_arrow->set_input(&events_out);
proc_arrow->set_output(topology->event_pool);
proc_arrow->add_processor(new SimpleProcessor);
Expand Down
6 changes: 3 additions & 3 deletions src/examples/misc/SubeventExample/SubeventExample.cc
Original file line number Diff line number Diff line change
Expand Up @@ -7,8 +7,8 @@
#include <JANA/JEventSource.h>
#include <JANA/JEventProcessor.h>

#include <JANA/Topology/JEventSourceArrow.h>
#include <JANA/Topology/JEventMapArrow.h>
#include <JANA/Topology/JSourceArrow.h>
#include <JANA/Topology/JMapArrow.h>
#include <JANA/Topology/JSubeventArrow.h>
#include <JANA/Topology/JTopologyBuilder.h>

Expand Down Expand Up @@ -111,7 +111,7 @@ int main() {
source_arrow->attach(topology->event_pool, 0);
source_arrow->attach(&events_in, 1);

auto proc_arrow = new JEventMapArrow("simpleProcessor");
auto proc_arrow = new JMapArrow("simpleProcessor");
proc_arrow->attach(&events_out, 0);
proc_arrow->attach(topology->event_pool, 1);
proc_arrow->add_processor(new SimpleProcessor);
Expand Down
6 changes: 3 additions & 3 deletions src/libraries/JANA/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -17,9 +17,9 @@ set(JANA2_SOURCES

Topology/JArrow.cc
Topology/JEventPool.cc
Topology/JEventSourceArrow.cc
Topology/JEventMapArrow.cc
Topology/JEventTapArrow.cc
Topology/JSourceArrow.cc
Topology/JMapArrow.cc
Topology/JTapArrow.cc
Topology/JMultilevelSourceArrow.cc
Topology/JTopologyBuilder.cc

Expand Down
74 changes: 38 additions & 36 deletions src/libraries/JANA/Engine/JExecutionEngine.cc
Original file line number Diff line number Diff line change
Expand Up @@ -55,16 +55,16 @@ void JExecutionEngine::Init() {
// Not sure how I feel about putting this here yet, but I think it will at least work in both cases it needs to.
// The reason this works is because JTopologyBuilder::create_topology() has already been called before
// JApplication::ProvideService<JExecutionEngine>().
for (JArrow* arrow : m_topology->arrows) {
for (JArrow* arrow : m_topology->GetArrows()) {

arrow->initialize();
arrow->Initialize();

m_arrow_states.emplace_back();
auto& arrow_state = m_arrow_states.back();
arrow_state.is_source = arrow->is_source();
arrow_state.is_sink = arrow->is_sink();
arrow_state.is_parallel = arrow->is_parallel();
arrow_state.next_input = arrow->get_next_port_index();
arrow_state.is_source = arrow->IsSource();
arrow_state.is_sink = arrow->IsSink();
arrow_state.is_parallel = arrow->IsParallel();
arrow_state.next_input = arrow->GetNextPortIndex();
}
}

Expand Down Expand Up @@ -125,17 +125,18 @@ void JExecutionEngine::ScaleWorkers(size_t nthreads) {
if (prev_nthreads < nthreads) {
// We are launching additional worker threads
LOG_DEBUG(GetLogger()) << "Scaling up to " << nthreads << " worker threads" << LOG_END;
auto mapping = m_topology->GetProcessorMapping();
for (size_t worker_id=prev_nthreads; worker_id < nthreads; ++worker_id) {
auto worker = std::make_unique<WorkerState>();
worker->worker_id = worker_id;
worker->is_stop_requested = false;
worker->cpu_id = m_topology->mapping.get_cpu_id(worker_id);
worker->location_id = m_topology->mapping.get_loc_id(worker_id);
worker->cpu_id = mapping.get_cpu_id(worker_id);
worker->location_id = mapping.get_loc_id(worker_id);
worker->thread = new std::thread(&JExecutionEngine::RunWorker, this, Worker{worker_id, &worker->backtrace});
LOG_DEBUG(GetLogger()) << "Launching worker thread " << worker_id << " on cpu=" << worker->cpu_id << ", location=" << worker->location_id << LOG_END;
m_worker_states.push_back(std::move(worker));

bool pin_to_cpu = (m_topology->mapping.get_affinity() != JProcessorMapping::AffinityStrategy::None);
bool pin_to_cpu = (mapping.get_affinity() != JProcessorMapping::AffinityStrategy::None);
if (pin_to_cpu) {
JCpuInfo::PinThreadToCpu(worker->thread, worker->cpu_id);
}
Expand Down Expand Up @@ -316,14 +317,14 @@ void JExecutionEngine::HandleFailures() {
// First, we log all of the failures we've found
for (auto& worker: m_worker_states) {
if (worker->is_timed_out) {
std::string arrow_name = (worker->last_arrow_id == static_cast<uint64_t>(-1)) ? "(none)" : m_topology->arrows[worker->last_arrow_id]->get_name();
std::string arrow_name = (worker->last_arrow_id == static_cast<uint64_t>(-1)) ? "(none)" : m_topology->GetArrows()[worker->last_arrow_id]->GetName();
LOG_FATAL(GetLogger()) << "Timeout in worker thread " << worker->worker_id << " while executing " << arrow_name << " on event #" << worker->last_event_nr << LOG_END;
pthread_kill(worker->thread->native_handle(), SIGUSR2);
LOG_INFO(GetLogger()) << "Worker thread signalled; waiting for backtrace capture." << LOG_END;
worker->backtrace.WaitForCapture();
}
if (worker->stored_exception != nullptr) {
std::string arrow_name = (worker->last_arrow_id == static_cast<uint64_t>(-1)) ? "(none)" : m_topology->arrows[worker->last_arrow_id]->get_name();
std::string arrow_name = (worker->last_arrow_id == static_cast<uint64_t>(-1)) ? "(none)" : m_topology->GetArrows()[worker->last_arrow_id]->GetName();
LOG_FATAL(GetLogger()) << "Exception in worker thread " << worker->worker_id << " while executing " << arrow_name << " on event #" << worker->last_event_nr << LOG_END;
}
}
Expand All @@ -349,10 +350,10 @@ void JExecutionEngine::FinishTopology() {
assert(m_runstatus == RunStatus::Paused);

LOG_DEBUG(GetLogger()) << "Finishing processing..." << LOG_END;
for (auto* arrow : m_topology->arrows) {
arrow->finalize();
for (auto* arrow : m_topology->GetArrows()) {
arrow->Finalize();
}
for (auto* pool: m_topology->pools) {
for (auto* pool: m_topology->GetPools()) {
pool->Finalize();
}
m_runstatus = RunStatus::Finished;
Expand Down Expand Up @@ -391,16 +392,17 @@ JExecutionEngine::Perf JExecutionEngine::GetPerf() {

JExecutionEngine::Worker JExecutionEngine::RegisterWorker() {
std::unique_lock<std::mutex> lock(m_mutex);
auto mapping = m_topology->GetProcessorMapping();
auto worker_id = m_worker_states.size();
auto worker = std::make_unique<WorkerState>();
worker->worker_id = worker_id;
worker->is_stop_requested = false;
worker->cpu_id = m_topology->mapping.get_cpu_id(worker_id);
worker->location_id = m_topology->mapping.get_loc_id(worker_id);
worker->cpu_id = mapping.get_cpu_id(worker_id);
worker->location_id = mapping.get_loc_id(worker_id);
worker->thread = nullptr;
m_worker_states.push_back(std::move(worker));

bool pin_to_cpu = (m_topology->mapping.get_affinity() != JProcessorMapping::AffinityStrategy::None);
bool pin_to_cpu = (mapping.get_affinity() != JProcessorMapping::AffinityStrategy::None);
if (pin_to_cpu) {
JCpuInfo::PinThreadToCpu(worker->thread, worker->cpu_id);
}
Expand All @@ -419,7 +421,7 @@ void JExecutionEngine::RunWorker(Worker worker) {
while (true) {
ExchangeTask(task, worker.worker_id);
if (task.arrow == nullptr) break; // Exit as soon as ExchangeTask() stops blocking
task.arrow->fire(task.input_event, task.outputs, task.output_count, task.status);
task.arrow->Fire(task.input_event, task.outputs, task.output_count, task.status);
}
LOG_DEBUG(GetLogger()) << "Stopped worker thread " << worker.worker_id << LOG_END;
}
Expand Down Expand Up @@ -493,13 +495,13 @@ void JExecutionEngine::CheckinCompletedTask_Unsafe(Task& task, WorkerState& work
arrow_state.total_processing_duration += processing_duration;

for (size_t output=0; output<task.output_count; ++output) {
if (!task.arrow->get_port(task.outputs[output].second).is_input) {
if (!task.arrow->GetPort(task.outputs[output].second).GetSkipFinishEvent()) {
arrow_state.events_processed++;
}
}

// Put each output in its correct queue or pool
task.arrow->push(task.outputs, task.output_count, worker.location_id);
task.arrow->Push(task.outputs, task.output_count, worker.location_id);

if (task.status == JArrow::FireResult::Finished) {
// If this is an eventsource self-terminating (the only thing that returns Status::Finished right now) it will
Expand Down Expand Up @@ -558,10 +560,10 @@ void JExecutionEngine::FindNextReadyTask_Unsafe(Task& task, WorkerState& worker)
// TODO: Support next_visit_time so that we don't hammer blocked event sources

// See if we can obtain an input event (this is silly)
JArrow* arrow = m_topology->arrows[arrow_id];
// TODO: consider setting state.next_input, retrieving via fire()
auto port = arrow->get_next_port_index();
JEvent* event = (port == -1) ? nullptr : arrow->pull(port, worker.location_id);
JArrow* arrow = m_topology->GetArrows()[arrow_id];
// TODO: consider setting state.next_input, retrieving via Fire()
auto port = arrow->GetNextPortIndex();
JEvent* event = (port == -1) ? nullptr : arrow->Pull(port, worker.location_id);
if (event != nullptr || port == -1) {
LOG_TRACE(GetLogger()) << "Scheduler: Found next ready arrow with id " << arrow_id << LOG_END;
// We've found a task that is ready!
Expand Down Expand Up @@ -657,18 +659,18 @@ void JExecutionEngine::PrintFinalReport() {
size_t total_useful_ms = 0;

for (size_t arrow_id=0; arrow_id < m_arrow_states.size(); ++arrow_id) {
auto* arrow = m_topology->arrows[arrow_id];
auto* arrow = m_topology->GetArrows()[arrow_id];
auto& arrow_state = m_arrow_states[arrow_id];
auto useful_ms = std::chrono::duration_cast<std::chrono::milliseconds>(arrow_state.total_processing_duration).count();
total_useful_ms += useful_ms;
auto avg_latency = useful_ms*1.0/arrow_state.events_processed;
auto throughput_bottleneck = 1000.0 / avg_latency;
if (arrow->is_parallel()) {
if (arrow->IsParallel()) {
throughput_bottleneck *= thread_count;
}

LOG_INFO(GetLogger()) << " - Arrow name: " << arrow->get_name() << LOG_END;
LOG_INFO(GetLogger()) << " Parallel: " << arrow->is_parallel() << LOG_END;
LOG_INFO(GetLogger()) << " - Arrow name: " << arrow->GetName() << LOG_END;
LOG_INFO(GetLogger()) << " Parallel: " << arrow->IsParallel() << LOG_END;
LOG_INFO(GetLogger()) << " Events completed: " << arrow_state.events_processed << LOG_END;
LOG_INFO(GetLogger()) << " Avg latency [ms/event]: " << avg_latency << LOG_END;
LOG_INFO(GetLogger()) << " Throughput bottleneck [Hz]: " << throughput_bottleneck << LOG_END;
Expand Down Expand Up @@ -708,12 +710,12 @@ bool JExecutionEngine::IsTimeoutEnabled() const {
JArrow::FireResult JExecutionEngine::Fire(size_t arrow_id, size_t location_id) {

std::unique_lock<std::mutex> lock(m_mutex);
if (arrow_id >= m_topology->arrows.size()) {
if (arrow_id >= m_topology->GetArrows().size()) {
LOG_WARN(GetLogger()) << "Firing unsuccessful: No arrow exists with id=" << arrow_id << LOG_END;
return JArrow::FireResult::NotRunYet;
}
JArrow* arrow = m_topology->arrows[arrow_id];
LOG_WARN(GetLogger()) << "Attempting to fire arrow with name=" << arrow->get_name()
JArrow* arrow = m_topology->GetArrows()[arrow_id];
LOG_WARN(GetLogger()) << "Attempting to fire arrow with name=" << arrow->GetName()
<< ", index=" << arrow_id << ", location=" << location_id << LOG_END;

ArrowState& arrow_state = m_arrow_states[arrow_id];
Expand All @@ -727,10 +729,10 @@ JArrow::FireResult JExecutionEngine::Fire(size_t arrow_id, size_t location_id) {
}
arrow_state.active_tasks += 1;

auto port = arrow->get_next_port_index();
auto port = arrow->GetNextPortIndex();
JEvent* event = nullptr;
if (port != -1) {
event = arrow->pull(port, location_id);
event = arrow->Pull(port, location_id);
if (event == nullptr) {
LOG_WARN(GetLogger()) << "Firing unsuccessful: Arrow needs an input event from port " << port << ", but the queue or pool is empty." << LOG_END;
arrow_state.active_tasks -= 1;
Expand All @@ -750,8 +752,8 @@ JArrow::FireResult JExecutionEngine::Fire(size_t arrow_id, size_t location_id) {
JArrow::FireResult result = JArrow::FireResult::NotRunYet;

LOG_WARN(GetLogger()) << "Firing arrow" << LOG_END;
arrow->fire(event, outputs, output_count, result);
LOG_WARN(GetLogger()) << "Fired arrow with result " << to_string(result) << LOG_END;
arrow->Fire(event, outputs, output_count, result);
LOG_WARN(GetLogger()) << "Fired arrow with result " << ToString(result) << LOG_END;
if (output_count == 0) {
LOG_WARN(GetLogger()) << "No output events" << LOG_END;
}
Expand All @@ -762,7 +764,7 @@ JArrow::FireResult JExecutionEngine::Fire(size_t arrow_id, size_t location_id) {
}

lock.lock();
arrow->push(outputs, output_count, location_id);
arrow->Push(outputs, output_count, location_id);
arrow_state.active_tasks -= 1;
lock.unlock();
return result;
Expand Down
2 changes: 1 addition & 1 deletion src/libraries/JANA/JApplication.cc
Original file line number Diff line number Diff line change
Expand Up @@ -156,7 +156,7 @@ void JApplication::Initialize() {
m_desired_nthreads = JCpuInfo::GetNumCpus();
}

topology_builder->create_topology();
topology_builder->CreateTopology();

m_params->SetDefaultParameter("jana:inspect", m_inspect, "Controls whether to drop immediately into the Inspector upon Run()");

Expand Down
2 changes: 1 addition & 1 deletion src/libraries/JANA/JEventSource.h
Original file line number Diff line number Diff line change
Expand Up @@ -78,7 +78,7 @@ class JEventSource : public jana::components::JComponent,


/// For work that should be done in parallel on a JEvent, but is tightly coupled to the JEventSource for some reason.
/// Called after Emit() by JEventMapArrow, but only if EnableProcessParallel(true) is set. Note that the JEvent& is not
/// Called after Emit() by JMapArrow, but only if EnableProcessParallel(true) is set. Note that the JEvent& is not
/// const here, because we need to be able to call event.Insert() from here. Also note that `this` IS const, because
/// it is not safe to access any state in parallel from here. Note that this includes things like calibration constants.
/// If you need to safely access state, put use a JFactory instead.
Expand Down
Loading
Loading