Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 2 additions & 8 deletions k4Reco/Overlay/components/OverlayTimingRandomMix.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@

#include <TMath.h>

#include <future>
#include <random>
#include <utility>
#include <vector>
Expand Down Expand Up @@ -257,18 +258,11 @@ retType OverlayTimingRandomMix::operator()(
<< endmsg;

for (int k = 0; k < NOverlay_to_this_BX; ++k) {
info() << "Overlaying background event " << m_bkgEvents->m_nextEntry[groupIndex][k] << " from group " << groupIndex
<< " to BX " << bxInTrain << endmsg;
if (m_bkgEvents->m_nextEntry[groupIndex][k] >= m_bkgEvents->m_totalNumberOfEvents[groupIndex][k] &&
!m_allowReusingBackgroundFiles) {
throw GaudiException("No more events in background file", name(), StatusCode::FAILURE);
}
podio::Reader reader = m_bkgEvents->open(groupIndex, v_file_indices[k]);
debug() << "File: " << m_bkgEvents->m_fileNames[groupIndex][v_file_indices[k]]
<<"\nNumber of Events: "<< reader.getEvents() << endmsg;
const auto backgroundEvent = reader.readEvent(m_bkgEvents->m_nextEntry[groupIndex][k]);
m_bkgEvents->m_nextEntry[groupIndex][k]++;
m_bkgEvents->m_nextEntry[groupIndex][k] %= m_bkgEvents->m_totalNumberOfEvents[groupIndex][k];
podio::Frame backgroundEvent = m_bkgEvents->open(groupIndex, v_file_indices[k]);
const auto availableCollections = backgroundEvent.getAvailableCollections();

// Either 0 or negative
Expand Down
71 changes: 63 additions & 8 deletions k4Reco/Overlay/components/OverlayTimingRandomMix.h
Original file line number Diff line number Diff line change
Expand Up @@ -48,16 +48,33 @@
#include "Gaudi/Parsers/Factory.h"
#include "Gaudi/Property.h"

#include <condition_variable>
#include <future>
#include <map>
#include <queue>
#include <random>
#include <string>
#include <thread>
#include <vector>
#include <mutex>

namespace OverlayTimingRandomMixNS {
struct EventHolder {
struct Request {
int fileIndex;
int groupIndex;
std::promise<podio::Frame> prom;
};

std::vector<std::vector<std::string>> m_fileNames;
std::vector<std::vector<size_t>> m_totalNumberOfEvents;
std::vector<std::vector<size_t>> m_nextEntry;
std::vector<std::vector<size_t>> m_nextEntry;

std::queue<Request> m_requests; // single queue to serialize all ROOT I/O
std::mutex m_queueMutex;
std::condition_variable m_queueCV;
std::thread m_worker;
bool m_stop{false};

EventHolder(const std::vector<std::vector<std::string>>& fileNames) : m_fileNames(fileNames) {
m_totalNumberOfEvents.resize(m_fileNames.size());
Expand All @@ -69,15 +86,56 @@ namespace OverlayTimingRandomMixNS {
m_totalNumberOfEvents[group].push_back(1);//m_rootFileReaders[group].back().getEntries("events"));
}
}

// Single worker thread to ensure all ROOT I/O happens in one thread (avoids ROOT TFile UUID races)
m_worker = std::thread([this]() {
while (true) {
Request req;
{
std::unique_lock<std::mutex> lock(m_queueMutex);
m_queueCV.wait(lock, [this]() { return m_stop || !m_requests.empty(); });
if (m_stop && m_requests.empty()) {
return;
}
req = std::move(m_requests.front());
m_requests.pop();
}

try {
podio::Reader reader = podio::makeReader(m_fileNames[req.groupIndex][req.fileIndex]);
podio::Frame frame = reader.readEvent(m_nextEntry[req.groupIndex][req.fileIndex]);
m_nextEntry[req.groupIndex][req.fileIndex]++;
m_nextEntry[req.groupIndex][req.fileIndex] %= m_totalNumberOfEvents[req.groupIndex][req.fileIndex];
req.prom.set_value(std::move(frame));
} catch (...) {
req.prom.set_exception(std::current_exception());
}
}
});
}
EventHolder() = default;

podio::Reader open(int groupIndex, int index) {
return podio::makeReader(m_fileNames[groupIndex][index]);
~EventHolder() {
{
std::lock_guard<std::mutex> lock(m_queueMutex);
m_stop = true;
}
m_queueCV.notify_all();
if (m_worker.joinable()) {
m_worker.join();
}
}

// TODO: Cache functionality
// podio::Frame& read
podio::Frame open(int groupIndex, int index) {
Request req{index, groupIndex, std::promise<podio::Frame>()};
auto fut = req.prom.get_future();
{
std::lock_guard<std::mutex> lock(m_queueMutex);
m_requests.push(std::move(req));
}
m_queueCV.notify_one();
return fut.get();
}

size_t size() const { return m_fileNames.size(); }
};
Expand Down Expand Up @@ -144,9 +202,6 @@ struct OverlayTimingRandomMix : public k4FWCore::MultiTransformer<retType(

Gaudi::Property<bool> m_mergeMCParticles{this, "MergeMCParticles", true, "Merge the MC Particle collections"};

// Gaudi::Property<int> m_maxCachedFrames{
// this, "MaxCachedFrames", 0, "Maximum number of frames cached from background files"};

private:
inline static thread_local std::mt19937 m_engine;
SmartIF<IUniqueIDGenSvc> m_uidSvc;
Expand Down
8 changes: 5 additions & 3 deletions k4Reco/Tracking/src/RefitFinal.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -209,9 +209,11 @@ std::tuple<edm4hep::TrackCollection, edm4hep::TrackMCParticleLinkCollection> Ref
}

// create the Track-MCParticle relation
edm4hep::MutableTrackMCParticleLink relation = trackRelationCollection.create();
relation.setFrom(edm4hep_trk);
relation.setTo(trackIndexToMCParticle.at(static_cast<int>(iTrack)));
if (trackIndexToMCParticle.find(static_cast<int>(iTrack)) != trackIndexToMCParticle.end()) {
edm4hep::MutableTrackMCParticleLink relation = trackRelationCollection.create();
relation.setFrom(edm4hep_trk);
relation.setTo(trackIndexToMCParticle.at(static_cast<int>(iTrack)));
}
} // for loop to the tracks

debug() << "Final number of Tracks after refit = " << trackVec.size()
Expand Down