Skip to content

Commit dd12ee6

Browse files
committed
move the responsibility to provide TFN/TFF messages to the injecting tasks
1 parent 36c35ca commit dd12ee6

File tree

4 files changed

+74
-68
lines changed

4 files changed

+74
-68
lines changed

run/dpl_eventgen.cxx

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -119,7 +119,8 @@ struct GeneratorTask {
119119
}
120120

121121
// report number of TFs injected for the rate limiter to work
122-
++tfCounter;
122+
pc.outputs().snapshot(Output{"TFF", "TFFilename", 0}, "");
123+
pc.outputs().snapshot(Output{"TFN", "TFNumber", 0}, ++tfCounter);
123124
pc.services().get<o2::monitoring::Monitoring>().send(o2::monitoring::Metric{(uint64_t)tfCounter, "df-sent"}.addTag(o2::monitoring::tags::Key::Subsystem, o2::monitoring::tags::Value::DPL));
124125

125126
bool time_expired = false;
@@ -150,6 +151,8 @@ WorkflowSpec defineDataProcessing(ConfigContext const& cfgc)
150151
auto spec = adaptAnalysisTask<GeneratorTask>(cfgc);
151152
spec.outputs.emplace_back("MC", "MCHEADER", 0, Lifetime::Timeframe);
152153
spec.outputs.emplace_back("MC", "MCTRACKS", 0, Lifetime::Timeframe);
154+
spec.outputs.emplace_back("TFF", "TFFilename");
155+
spec.outputs.emplace_back("TFN", "TFNumber");
153156
spec.requiredServices.push_back(o2::framework::ArrowSupport::arrowBackendSpec());
154157
spec.algorithm = CommonDataProcessors::wrapWithRateLimiting(spec.algorithm);
155158
return {spec};

run/o2sim_hepmc_publisher.cxx

Lines changed: 52 additions & 47 deletions
Original file line numberDiff line numberDiff line change
@@ -37,7 +37,9 @@ struct O2simHepmcPublisher {
3737
int tfCounter = 0;
3838
std::shared_ptr<HepMC3::Reader> hepMCReader;
3939
bool eos = false;
40-
std::vector<o2::MCTrack> mcTracks;
40+
41+
std::vector<o2::pmr::vector<o2::MCTrack>*> mctracks_vector;
42+
std::vector<o2::dataformats::MCEventHeader*> mcheader_vector;
4143

4244
void init(o2::framework::InitContext& /*ic*/)
4345
{
@@ -50,13 +52,19 @@ struct O2simHepmcPublisher {
5052
LOGP(fatal, "Cannot open HEPMC kine file {}", (std::string)hepmcFileName);
5153
}
5254
// allocate the memory upfront to prevent reallocations later
53-
mcTracks.reserve(1e3 * aggregate);
55+
mctracks_vector.reserve(aggregate);
56+
mcheader_vector.reserve(aggregate);
5457
}
5558

5659
void run(o2::framework::ProcessingContext& pc)
5760
{
5861
HepMC3::GenEvent event;
59-
for (auto i = 0; i < (int)aggregate; ++i) {
62+
auto batch = maxEvents > 0 ? std::min((int)aggregate, (int)maxEvents - eventCounter) : (int)aggregate;
63+
for (auto i = 0; i < batch; ++i) {
64+
mctracks_vector.push_back(&pc.outputs().make<o2::pmr::vector<o2::MCTrack>>(Output{"MC", "MCTRACKS", 0}));
65+
auto& mctracks = mctracks_vector.back();
66+
mcheader_vector.push_back(&pc.outputs().make<o2::dataformats::MCEventHeader>(Output{"MC", "MCHEADER", 0}));
67+
auto& mcheader = mcheader_vector.back();
6068
// read next entry
6169
hepMCReader->read_event(event);
6270
if (hepMCReader->failed()) {
@@ -66,61 +74,60 @@ struct O2simHepmcPublisher {
6674
}
6775

6876
// create O2 MCHeader and MCtracks vector out of HEPMC event
69-
o2::dataformats::MCEventHeader mcHeader;
70-
mcHeader.SetEventID(event.event_number());
71-
mcHeader.SetVertex(event.event_pos().px(), event.event_pos().py(), event.event_pos().pz());
77+
mcheader->SetEventID(event.event_number());
78+
mcheader->SetVertex(event.event_pos().px(), event.event_pos().py(), event.event_pos().pz());
7279
auto xsecInfo = event.cross_section();
7380
if (xsecInfo != nullptr) {
74-
mcHeader.putInfo(MCInfoKeys::acceptedEvents, (uint64_t)xsecInfo->get_accepted_events());
75-
mcHeader.putInfo(MCInfoKeys::attemptedEvents, (uint64_t)xsecInfo->get_attempted_events());
76-
mcHeader.putInfo(MCInfoKeys::xSection, (float)xsecInfo->xsec());
77-
mcHeader.putInfo(MCInfoKeys::xSectionError, (float)xsecInfo->xsec_err());
81+
mcheader->putInfo(MCInfoKeys::acceptedEvents, (uint64_t)xsecInfo->get_accepted_events());
82+
mcheader->putInfo(MCInfoKeys::attemptedEvents, (uint64_t)xsecInfo->get_attempted_events());
83+
mcheader->putInfo(MCInfoKeys::xSection, (float)xsecInfo->xsec());
84+
mcheader->putInfo(MCInfoKeys::xSectionError, (float)xsecInfo->xsec_err());
7885
}
7986
auto scale = event.attribute<HepMC3::DoubleAttribute>(MCInfoKeys::eventScale);
8087
if (scale != nullptr) {
81-
mcHeader.putInfo(MCInfoKeys::eventScale, (float)scale->value());
88+
mcheader->putInfo(MCInfoKeys::eventScale, (float)scale->value());
8289
}
8390
auto nMPI = event.attribute<HepMC3::IntAttribute>(MCInfoKeys::mpi);
8491
if (nMPI != nullptr) {
85-
mcHeader.putInfo(MCInfoKeys::mpi, nMPI->value());
92+
mcheader->putInfo(MCInfoKeys::mpi, nMPI->value());
8693
}
8794
auto sid = event.attribute<HepMC3::IntAttribute>(MCInfoKeys::processCode);
8895
auto scode = event.attribute<HepMC3::IntAttribute>(MCInfoKeys::processID); // default pythia8 hepmc3 interface uses signal_process_id
8996
if (sid != nullptr) {
90-
mcHeader.putInfo(MCInfoKeys::processCode, sid->value());
97+
mcheader->putInfo(MCInfoKeys::processCode, sid->value());
9198
} else if (scode != nullptr) {
92-
mcHeader.putInfo(MCInfoKeys::processCode, scode->value());
99+
mcheader->putInfo(MCInfoKeys::processCode, scode->value());
93100
}
94101
auto pdfInfo = event.pdf_info();
95102
if (pdfInfo != nullptr) {
96-
mcHeader.putInfo(MCInfoKeys::pdfParton1Id, pdfInfo->parton_id[0]);
97-
mcHeader.putInfo(MCInfoKeys::pdfParton2Id, pdfInfo->parton_id[1]);
98-
mcHeader.putInfo(MCInfoKeys::pdfCode1, pdfInfo->pdf_id[0]);
99-
mcHeader.putInfo(MCInfoKeys::pdfCode2, pdfInfo->pdf_id[1]);
100-
mcHeader.putInfo(MCInfoKeys::pdfX1, (float)pdfInfo->x[0]);
101-
mcHeader.putInfo(MCInfoKeys::pdfX2, (float)pdfInfo->x[1]);
102-
mcHeader.putInfo(MCInfoKeys::pdfScale, (float)pdfInfo->scale);
103-
mcHeader.putInfo(MCInfoKeys::pdfXF1, (float)pdfInfo->xf[0]);
104-
mcHeader.putInfo(MCInfoKeys::pdfXF2, (float)pdfInfo->xf[1]);
103+
mcheader->putInfo(MCInfoKeys::pdfParton1Id, pdfInfo->parton_id[0]);
104+
mcheader->putInfo(MCInfoKeys::pdfParton2Id, pdfInfo->parton_id[1]);
105+
mcheader->putInfo(MCInfoKeys::pdfCode1, pdfInfo->pdf_id[0]);
106+
mcheader->putInfo(MCInfoKeys::pdfCode2, pdfInfo->pdf_id[1]);
107+
mcheader->putInfo(MCInfoKeys::pdfX1, (float)pdfInfo->x[0]);
108+
mcheader->putInfo(MCInfoKeys::pdfX2, (float)pdfInfo->x[1]);
109+
mcheader->putInfo(MCInfoKeys::pdfScale, (float)pdfInfo->scale);
110+
mcheader->putInfo(MCInfoKeys::pdfXF1, (float)pdfInfo->xf[0]);
111+
mcheader->putInfo(MCInfoKeys::pdfXF2, (float)pdfInfo->xf[1]);
105112
}
106113
auto heavyIon = event.heavy_ion();
107114
if (heavyIon != nullptr) {
108-
mcHeader.putInfo(MCInfoKeys::nCollHard, heavyIon->Ncoll_hard);
109-
mcHeader.putInfo(MCInfoKeys::nPartProjectile, heavyIon->Npart_proj);
110-
mcHeader.putInfo(MCInfoKeys::nPartTarget, heavyIon->Npart_targ);
111-
mcHeader.putInfo(MCInfoKeys::nColl, heavyIon->Ncoll);
112-
mcHeader.putInfo(MCInfoKeys::nCollNNWounded, heavyIon->N_Nwounded_collisions);
113-
mcHeader.putInfo(MCInfoKeys::nCollNWoundedN, heavyIon->Nwounded_N_collisions);
114-
mcHeader.putInfo(MCInfoKeys::nCollNWoundedNwounded, heavyIon->Nwounded_Nwounded_collisions);
115-
mcHeader.putInfo(MCInfoKeys::nSpecProjectileNeutron, heavyIon->Nspec_proj_n);
116-
mcHeader.putInfo(MCInfoKeys::nSpecProjectileProton, heavyIon->Nspec_proj_p);
117-
mcHeader.putInfo(MCInfoKeys::nSpecTargetNeutron, heavyIon->Nspec_targ_n);
118-
mcHeader.putInfo(MCInfoKeys::nSpecTargetProton, heavyIon->Nspec_targ_p);
119-
mcHeader.putInfo(MCInfoKeys::impactParameter, (float)heavyIon->impact_parameter);
120-
mcHeader.putInfo(MCInfoKeys::planeAngle, (float)heavyIon->event_plane_angle);
121-
mcHeader.putInfo("eccentricity", (float)heavyIon->eccentricity);
122-
mcHeader.putInfo(MCInfoKeys::sigmaInelNN, (float)heavyIon->sigma_inel_NN);
123-
mcHeader.putInfo(MCInfoKeys::centrality, (float)heavyIon->centrality);
115+
mcheader->putInfo(MCInfoKeys::nCollHard, heavyIon->Ncoll_hard);
116+
mcheader->putInfo(MCInfoKeys::nPartProjectile, heavyIon->Npart_proj);
117+
mcheader->putInfo(MCInfoKeys::nPartTarget, heavyIon->Npart_targ);
118+
mcheader->putInfo(MCInfoKeys::nColl, heavyIon->Ncoll);
119+
mcheader->putInfo(MCInfoKeys::nCollNNWounded, heavyIon->N_Nwounded_collisions);
120+
mcheader->putInfo(MCInfoKeys::nCollNWoundedN, heavyIon->Nwounded_N_collisions);
121+
mcheader->putInfo(MCInfoKeys::nCollNWoundedNwounded, heavyIon->Nwounded_Nwounded_collisions);
122+
mcheader->putInfo(MCInfoKeys::nSpecProjectileNeutron, heavyIon->Nspec_proj_n);
123+
mcheader->putInfo(MCInfoKeys::nSpecProjectileProton, heavyIon->Nspec_proj_p);
124+
mcheader->putInfo(MCInfoKeys::nSpecTargetNeutron, heavyIon->Nspec_targ_n);
125+
mcheader->putInfo(MCInfoKeys::nSpecTargetProton, heavyIon->Nspec_targ_p);
126+
mcheader->putInfo(MCInfoKeys::impactParameter, (float)heavyIon->impact_parameter);
127+
mcheader->putInfo(MCInfoKeys::planeAngle, (float)heavyIon->event_plane_angle);
128+
mcheader->putInfo("eccentricity", (float)heavyIon->eccentricity);
129+
mcheader->putInfo(MCInfoKeys::sigmaInelNN, (float)heavyIon->sigma_inel_NN);
130+
mcheader->putInfo(MCInfoKeys::centrality, (float)heavyIon->centrality);
124131
}
125132

126133
auto particles = event.particles();
@@ -131,26 +138,22 @@ struct O2simHepmcPublisher {
131138
auto has_children = children.size() > 0;
132139
auto p = particle->momentum();
133140
auto v = particle->production_vertex();
134-
mcTracks.emplace_back(
141+
mctracks->emplace_back(
135142
particle->pid(),
136143
has_parents ? parents.front()->id() : -1, has_parents ? parents.back()->id() : -1,
137144
has_children ? children.front()->id() : -1, has_children ? children.back()->id() : -1,
138145
p.px(), p.py(), p.pz(),
139146
v->position().x(), v->position().y(), v->position().z(),
140147
v->position().t(), 0);
141148
}
142-
143-
// add to the message
144-
pc.outputs().snapshot(Output{"MC", "MCHEADER", 0}, mcHeader);
145-
pc.outputs().snapshot(Output{"MC", "MCTRACKS", 0}, mcTracks);
146-
mcTracks.clear();
147149
++eventCounter;
148150
}
149151

150152
// report number of TFs injected for the rate limiter to work
151-
++tfCounter;
153+
pc.outputs().snapshot(Output{"TFF", "TFFilename", 0}, "");
154+
pc.outputs().snapshot(Output{"TFN", "TFNumber", 0}, ++tfCounter);
152155
pc.services().get<o2::monitoring::Monitoring>().send(o2::monitoring::Metric{(uint64_t)tfCounter, "df-sent"}.addTag(o2::monitoring::tags::Key::Subsystem, o2::monitoring::tags::Value::DPL));
153-
if (eos || (maxEvents > 0 && eventCounter == maxEvents)) {
156+
if (eos || (maxEvents > 0 && eventCounter >= maxEvents)) {
154157
pc.services().get<ControlService>().endOfStream();
155158
pc.services().get<ControlService>().readyToQuit(QuitRequest::Me);
156159
}
@@ -162,6 +165,8 @@ WorkflowSpec defineDataProcessing(ConfigContext const& cfgc)
162165
auto spec = adaptAnalysisTask<O2simHepmcPublisher>(cfgc);
163166
spec.outputs.emplace_back("MC", "MCHEADER", 0, Lifetime::Timeframe);
164167
spec.outputs.emplace_back("MC", "MCTRACKS", 0, Lifetime::Timeframe);
168+
spec.outputs.emplace_back("TFF", "TFFilename");
169+
spec.outputs.emplace_back("TFN", "TFNumber");
165170
spec.requiredServices.push_back(o2::framework::ArrowSupport::arrowBackendSpec());
166171
spec.algorithm = CommonDataProcessors::wrapWithRateLimiting(spec.algorithm);
167172
return {spec};

run/o2sim_kine_publisher.cxx

Lines changed: 7 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -40,15 +40,17 @@ struct O2simKinePublisher {
4040

4141
void run(o2::framework::ProcessingContext& pc)
4242
{
43-
for (auto i = 0; i < std::min((int)aggregate, nEvents - eventCounter); ++i) {
43+
auto batch = std::min((int)aggregate, nEvents - eventCounter);
44+
for (auto i = 0; i < batch; ++i) {
4445
auto mcevent = mcKinReader->getMCEventHeader(0, eventCounter);
4546
auto mctracks = mcKinReader->getTracks(0, eventCounter);
4647
pc.outputs().snapshot(Output{"MC", "MCHEADER", 0}, mcevent);
4748
pc.outputs().snapshot(Output{"MC", "MCTRACKS", 0}, mctracks);
4849
++eventCounter;
4950
}
5051
// report number of TFs injected for the rate limiter to work
51-
++tfCounter;
52+
pc.outputs().snapshot(Output{"TFF", "TFFilename", 0}, "");
53+
pc.outputs().snapshot(Output{"TFN", "TFNumber", 0}, ++tfCounter);
5254
pc.services().get<o2::monitoring::Monitoring>().send(o2::monitoring::Metric{(uint64_t)tfCounter, "df-sent"}.addTag(o2::monitoring::tags::Key::Subsystem, o2::monitoring::tags::Value::DPL));
5355
if (eventCounter >= nEvents) {
5456
pc.services().get<ControlService>().endOfStream();
@@ -62,6 +64,9 @@ WorkflowSpec defineDataProcessing(ConfigContext const& cfgc)
6264
auto spec = adaptAnalysisTask<O2simKinePublisher>(cfgc);
6365
spec.outputs.emplace_back("MC", "MCHEADER", 0, Lifetime::Timeframe);
6466
spec.outputs.emplace_back("MC", "MCTRACKS", 0, Lifetime::Timeframe);
67+
spec.outputs.emplace_back("TFF", "TFFilename");
68+
spec.outputs.emplace_back("TFN", "TFNumber");
69+
6570
spec.requiredServices.push_back(o2::framework::ArrowSupport::arrowBackendSpec());
6671
spec.algorithm = CommonDataProcessors::wrapWithTimesliceConsumption(spec.algorithm);
6772
return {spec};

run/o2sim_mctracks_to_aod.cxx

Lines changed: 11 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -49,11 +49,8 @@ struct MctracksToAod {
4949
"Interaction rate to simulate"};
5050
Configurable<bool> filt{"filter-mctracks", false,
5151
"Filter tracks"};
52-
Configurable<uint64_t> tfOffset{"tf-offset", 0, "Start TF counter from an offset"};
5352
/** @} */
5453

55-
/** Number of timeframes */
56-
uint64_t mTimeFrame = 0;
5754
/** Interaction simulation */
5855
InteractionSampler mSampler;
5956

@@ -63,14 +60,15 @@ struct MctracksToAod {
6360
mSampler.setInteractionRate(IR);
6461
mSampler.setFirstIR({0, 0});
6562
mSampler.init();
66-
67-
mTimeFrame = tfOffset;
6863
}
6964

7065
/** Run the conversion */
7166
void run(o2::framework::ProcessingContext& pc)
7267
{
73-
LOG(debug) << "=== Running extended MC AOD exporter ===";
68+
auto tfn = pc.inputs().get<uint64_t>("tfn");
69+
auto tff = pc.inputs().get<std::string>("tff");
70+
LOG(detail) << "File: \"" << tff << "\"; TF = " << tfn;
71+
LOG(detail) << "=== Running extended MC AOD exporter ===";
7472
using namespace o2::aodmchelpers;
7573
using McHeader = o2::dataformats::MCEventHeader;
7674
using McTrack = o2::MCTrack;
@@ -87,33 +85,31 @@ struct MctracksToAod {
8785
<< "number of track vectors: " << nParts
8886
<< " != " << nPartsVerify
8987
<< ", shipping the empty timeframe";
90-
pc.outputs().snapshot(Output{"TFF", "TFFilename", 0}, "");
91-
pc.outputs().snapshot(Output{"TFN", "TFNumber", 0}, ++mTimeFrame);
9288
return;
9389
}
9490
// TODO: include BC simulation
9591
auto bcCounter = 0UL;
9692
size_t offset = 0;
97-
LOG(debug) << "--- Loop over " << nParts << " parts ---";
93+
LOG(detail) << "--- Loop over " << nParts << " parts ---";
9894
for (auto i = 0U; i < nParts; ++i) {
9995
auto record = mSampler.generateCollisionTime();
10096
auto header = pc.inputs().get<McHeader*>("mcheader", i);
10197
auto tracks = pc.inputs().get<McTracks>("mctracks", i);
10298

103-
LOG(debug) << "Updating collision table";
99+
LOG(detail) << "Updating collision table";
104100
auto genID = updateMCCollisions(mCollisions.cursor,
105101
bcCounter,
106102
record.timeInBCNS * 1.e-3,
107103
*header,
108104
0,
109105
i);
110106

111-
LOG(debug) << "Updating HepMC tables";
107+
LOG(detail) << "Updating HepMC tables";
112108
updateHepMCXSection(mXSections.cursor, bcCounter, genID, *header);
113109
updateHepMCPdfInfo(mPdfInfos.cursor, bcCounter, genID, *header);
114110
updateHepMCHeavyIon(mHeavyIons.cursor, bcCounter, genID, *header);
115111

116-
LOG(debug) << "Updating particles table";
112+
LOG(detail) << "Updating particles table";
117113
TrackToIndex preselect;
118114
offset = updateParticles(mParticles.cursor,
119115
bcCounter,
@@ -123,12 +119,9 @@ struct MctracksToAod {
123119
(bool)filt,
124120
false);
125121

126-
LOG(debug) << "Increment BC counter";
122+
LOG(detail) << "Increment BC counter";
127123
bcCounter++;
128124
}
129-
130-
pc.outputs().snapshot(Output{"TFF", "TFFilename", 0}, "");
131-
pc.outputs().snapshot(Output{"TFN", "TFNumber", 0}, ++mTimeFrame);
132125
}
133126
};
134127

@@ -149,8 +142,8 @@ WorkflowSpec defineDataProcessing(ConfigContext const& cfgc)
149142
Lifetime::Timeframe);
150143
spec.inputs.emplace_back("mcheader", "MC", "MCHEADER", 0.,
151144
Lifetime::Timeframe);
152-
spec.outputs.emplace_back("TFF", "TFFilename");
153-
spec.outputs.emplace_back("TFN", "TFNumber");
145+
spec.inputs.emplace_back("tfn", "TFN", "TFNumber", 0, Lifetime::Timeframe);
146+
spec.inputs.emplace_back("tff", "TFF", "TFFilename", 0, Lifetime::Timeframe);
154147

155148
return {spec};
156149
}

0 commit comments

Comments
 (0)