Reprocessing#7
Conversation
📝 WalkthroughWalkthroughThis PR implements a complete reprocessing-from-RAW subsystem for Tier-0. It adds configuration helpers and an example file, Oracle schema and DAOs for request tracking, DBS/Rucio data staging functions, a multi-state ReprocessingPoller orchestrator that manages the workflow from submission through workload injection and completion, and a Harness component wrapper. Comprehensive unit tests validate configuration, data staging, and state transitions. ChangesReprocessing from RAW feature
Estimated code review effort🎯 4 (Complex) | ⏱️ ~60 minutes Poem
🚥 Pre-merge checks | ✅ 2 | ❌ 3❌ Failed checks (2 warnings, 1 inconclusive)
✅ Passed checks (2 passed)
✏️ Tip: You can configure your own custom pre-merge checks in the settings. ✨ Finishing Touches📝 Generate docstrings
🧪 Generate unit tests (beta)
Comment |
|
@coderabbitai help |
ChatThere are 3 ways to chat with CodeRabbit:
CodeRabbit commands
Other keywords and placeholders
CodeRabbit configuration file (
|
|
Tip For best results, initiate chat on the files or code changes. 🧩 Analysis chain🏁 Script executed: Length of output: 76
What would you like me to do? 😊 (ᵔᴥᵔ)🐇 |
|
@coderabbitai review |
✅ Actions performedReview triggered.
|
There was a problem hiding this comment.
Actionable comments posted: 6
🧹 Nitpick comments (2)
src/python/T0/WMBS/Oracle/Create.py (1)
550-551: ⚡ Quick winUse a composite index to match status polling query order.
Status polling reads
WHERE status = ... ORDER BY created_at; indexing(status, created_at)avoids extra sort work as queue size grows.Suggested fix
- """CREATE INDEX idx_reprocessing_request_1 ON reprocessing_request (status)""" + """CREATE INDEX idx_reprocessing_request_1 ON reprocessing_request (status, created_at)"""🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the rest with a brief reason, keep changes minimal, and validate. In `@src/python/T0/WMBS/Oracle/Create.py` around lines 550 - 551, Replace the single-column index creation assigned to self.indexes[...] (currently creating idx_reprocessing_request_1 on reprocessing_request (status)) with a composite index on (status, created_at) to match the polling query order; update the index name (e.g. idx_reprocessing_request_status_created_at) and the CREATE INDEX statement to "CREATE INDEX idx_reprocessing_request_status_created_at ON reprocessing_request (status, created_at)" so the database can avoid extra sorting when queries use WHERE status = ... ORDER BY created_at.src/python/T0Component/Reprocessing/Reprocessing.py (1)
30-37: ⚡ Quick winReplace
print(...)here bypasses component logging/formatting controls; uselogging.info/debuginstead.🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the rest with a brief reason, keep changes minimal, and validate. In `@src/python/T0Component/Reprocessing/Reprocessing.py` around lines 30 - 37, Replace the two print(...) calls in Reprocessing.__init__ and Reprocessing.preInitialization with proper logging calls (e.g., logging.getLogger(__name__).info(...) or the component instance logger if available, e.g., self.logger.info(...)) so messages respect configured logging/formatting; update the message text to include the method name (Reprocessing.__init__ and Reprocessing.preInitialization) and choose info or debug level accordingly.
🤖 Prompt for all review comments with AI agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.
Inline comments:
In `@src/python/T0/RunConfig/Tier0Config.py`:
- Around line 1357-1359: The guard that checks only "if requestIndex >=
len(config.Global.ReprocessingRequests)" allows negative indices, so update all
reprocessing setter functions (e.g., where you access
config.Global.ReprocessingRequests[requestIndex] such as the WriteTiers setter
and the similar blocks at the other noted locations) to also reject negative
values; add a check like "if requestIndex < 0 or requestIndex >=
len(config.Global.ReprocessingRequests): raise RuntimeError(...)" so invalid
negative indexes are refused before assigning to
config.Global.ReprocessingRequests[requestIndex]['WriteTiers'] (and the
analogous keys in the other setters).
- Around line 1301-1345: The addReprocessingRequest helper currently accepts
invalid requests; add upfront validation inside the addReprocessingRequest
function to enforce the documented invariants: verify inputDataset is a
non-empty string that ends with "/RAW", ensure globalTag, cmsswVersion,
scramArch and scenario are non-empty strings, confirm procVersion is an integer
> 0, validate RecoSplit and Multicore (options.get('recoSplit') and
options.get('multicore')) are positive integers, and ensure list-valued options
like writeTiers/alcaSkims/physicsSkims/dqmSequences are actual lists (and
writeTiers is not empty); if any check fails, raise a ValueError (or TypeError
for bad types) before appending to config.Global.ReprocessingRequests so only
valid requests are added.
In `@src/python/T0Component/Reprocessing/ReprocessingPoller.py`:
- Around line 142-147: The in-memory dedup key (requestKey) in
ReprocessingPoller using only (InputDataset, ProcVersion) is too narrow and
process-local (self.insertedRequests), causing dropped distinct requests and
duplicates after restarts; update the dedup key construction in the loop that
builds requestKey to include additional distinguishing fields such as
RunRange/RunNumber, LumiRange/Lumi, RequestName and any Options/ExtraArgs
present in the request, and stop relying solely on self.insertedRequests by
adding a DAO existence check (or enforcing/using a DB unique constraint) before
inserting so the check survives process restarts; apply the same change to the
other dedup usage around the second occurrence (the block at the other reported
location).
- Around line 581-583: The loop over notClosedOut uses a fragile substring check
`if workflowName in str(workflow)` which yields false positives; change it to
compare the workflowName against a concrete workflow identifier field (e.g., use
`workflow.get('name') == workflowName` or `workflow.get('workflowName') ==
workflowName` or compare IDs like `workflow.get('id') == workflowName`) inside
the for-loop that sets workflowDone, so you perform an exact match (or a clearly
delimited match such as equality on a parsed field) instead of converting the
whole workflow object to string; update the check in the block that assigns
workflowDone to use that explicit attribute comparison.
- Around line 290-317: The DB transaction is opened before calling the external
DataStagingAPI.createStagingRules, which can block DB resources; change the flow
so createStagingRules is called entirely outside the DB transaction, then begin
myThread.transaction only when you need to persist the resulting rule
IDs/status. Concretely: call DataStagingAPI.createStagingRules(...) first
(handle exceptions and set an appropriate failure/cleanup path), then call
myThread.transaction.begin(), call updateStatusDAO.execute(requestId, "STAGING",
rucioRuleId=ruleIdStr, ...) or updateStatusDAO.execute(..., "READY", ...) as
needed, and finally myThread.transaction.commit(); keep the logging and ensure
rollback is used on transaction errors. Reference symbols:
myThread.transaction.begin/commit, DataStagingAPI.createStagingRules,
updateStatusDAO.execute, ruleIdStr.
In `@test/python/T0_t/Reprocessing_t/ReprocessingPoller_t.py`:
- Around line 121-154: Extend test_duplicate_request_skipped in
ReprocessingPoller_t to also cover distinct requests that share InputDataset and
ProcVersion but differ in run/lumi: create two request dicts where
'InputDataset' and 'ProcVersion' are identical but 'RunId'/'MinLumi'/'MaxLumi'
differ, set mockConfig.Global.ReprocessingRequests to [req1, req2], keep
daoFactory returning mockInsertDAO and call insertConfigRequests(self.poller,
mockConfig) the same way (using patched threading.currentThread), and assert
mockInsertDAO.execute.call_count == 2 to verify both were inserted; keep the
original duplicate-identical check as well.
---
Nitpick comments:
In `@src/python/T0/WMBS/Oracle/Create.py`:
- Around line 550-551: Replace the single-column index creation assigned to
self.indexes[...] (currently creating idx_reprocessing_request_1 on
reprocessing_request (status)) with a composite index on (status, created_at) to
match the polling query order; update the index name (e.g.
idx_reprocessing_request_status_created_at) and the CREATE INDEX statement to
"CREATE INDEX idx_reprocessing_request_status_created_at ON reprocessing_request
(status, created_at)" so the database can avoid extra sorting when queries use
WHERE status = ... ORDER BY created_at.
In `@src/python/T0Component/Reprocessing/Reprocessing.py`:
- Around line 30-37: Replace the two print(...) calls in Reprocessing.__init__
and Reprocessing.preInitialization with proper logging calls (e.g.,
logging.getLogger(__name__).info(...) or the component instance logger if
available, e.g., self.logger.info(...)) so messages respect configured
logging/formatting; update the message text to include the method name
(Reprocessing.__init__ and Reprocessing.preInitialization) and choose info or
debug level accordingly.
🪄 Autofix (Beta)
Fix all unresolved CodeRabbit comments on this PR:
- Push a commit to this branch (recommended)
- Create a new PR with the fixes
ℹ️ Review info
⚙️ Run configuration
Configuration used: defaults
Review profile: CHILL
Plan: Pro Plus
Run ID: daf53656-0371-46b5-855e-3c613fa2b114
📒 Files selected for processing (17)
etc/ReprocessingConfiguration.pysrc/python/T0/Reprocessing/DataStagingAPI.pysrc/python/T0/Reprocessing/__init__.pysrc/python/T0/RunConfig/Tier0Config.pysrc/python/T0/WMBS/Oracle/Create.pysrc/python/T0/WMBS/Oracle/Reprocessing/GetReprocessingByStatus.pysrc/python/T0/WMBS/Oracle/Reprocessing/GetReprocessingRequest.pysrc/python/T0/WMBS/Oracle/Reprocessing/InsertReprocessingRequest.pysrc/python/T0/WMBS/Oracle/Reprocessing/ListReprocessingRequests.pysrc/python/T0/WMBS/Oracle/Reprocessing/UpdateReprocessingStatus.pysrc/python/T0/WMBS/Oracle/Reprocessing/__init__.pysrc/python/T0Component/Reprocessing/Reprocessing.pysrc/python/T0Component/Reprocessing/ReprocessingPoller.pysrc/python/T0Component/Reprocessing/__init__.pytest/python/T0_t/Reprocessing_t/DataStagingAPI_t.pytest/python/T0_t/Reprocessing_t/ReprocessingPoller_t.pytest/python/T0_t/Reprocessing_t/__init__.py
| def addReprocessingRequest(config, inputDataset, globalTag, cmsswVersion, | ||
| scramArch, procVersion, scenario, **options): | ||
| """ | ||
| _addReprocessingRequest_ | ||
|
|
||
| Add a reprocessing-from-RAW request to the configuration. | ||
|
|
||
| Required parameters: | ||
| inputDataset - DBS dataset path, must end in /RAW | ||
| globalTag - conditions GlobalTag | ||
| cmsswVersion - CMSSW release to use | ||
| scramArch - SCRAM architecture | ||
| procVersion - processing version (int) | ||
| scenario - CMSSW scenario name | ||
|
|
||
| Optional parameters (via **options): | ||
| writeTiers - list of output tiers (default: ["AOD", "MINIAOD", "DQM"]) | ||
| alcaSkims - list of AlcaSkim names (default: []) | ||
| physicsSkims - list of physics skim names (default: []) | ||
| dqmSequences - list of DQM sequences (default: []) | ||
| nanoFlavour - NanoAOD flavour (default: None) | ||
| recoSplit - events per job (default: 500) | ||
| multicore - number of cores (default: 8) | ||
| """ | ||
| request = { | ||
| 'InputDataset': inputDataset, | ||
| 'GlobalTag': globalTag, | ||
| 'CMSSWVersion': cmsswVersion, | ||
| 'ScramArch': scramArch, | ||
| 'ProcVersion': procVersion, | ||
| 'Scenario': scenario, | ||
| 'WriteTiers': options.get('writeTiers', ["AOD", "MINIAOD", "DQM"]), | ||
| 'AlcaSkims': options.get('alcaSkims', []), | ||
| 'PhysicsSkims': options.get('physicsSkims', []), | ||
| 'DqmSequences': options.get('dqmSequences', []), | ||
| 'NanoFlavour': options.get('nanoFlavour', None), | ||
| 'RecoSplit': options.get('recoSplit', 500), | ||
| 'Multicore': options.get('multicore', 8), | ||
| 'RunId': None, | ||
| 'MinLumi': None, | ||
| 'MaxLumi': None, | ||
| } | ||
|
|
||
| config.Global.ReprocessingRequests.append(request) | ||
| return |
There was a problem hiding this comment.
Enforce required request invariants at creation time.
addReprocessingRequest documents strict requirements (e.g., RAW dataset), but currently accepts invalid values and defers failure downstream.
Suggested fix
def addReprocessingRequest(config, inputDataset, globalTag, cmsswVersion,
scramArch, procVersion, scenario, **options):
@@
+ if not isinstance(inputDataset, str) or not inputDataset.endswith("/RAW"):
+ raise RuntimeError("inputDataset must be a DBS dataset path ending in /RAW")
+ if not isinstance(procVersion, int) or procVersion < 1:
+ raise RuntimeError("procVersion must be a positive integer")
+
+ writeTiers = options.get('writeTiers', ["AOD", "MINIAOD", "DQM"])
+ if not isinstance(writeTiers, list) or len(writeTiers) == 0:
+ raise RuntimeError("writeTiers must be a non-empty list")
+
request = {
@@
- 'WriteTiers': options.get('writeTiers', ["AOD", "MINIAOD", "DQM"]),
+ 'WriteTiers': writeTiers,🧰 Tools
🪛 Pylint (4.0.5)
[refactor] 1301-1301: Too many positional arguments (7/5)
(R0917)
[refactor] 1301-1301: Useless return at end of function or method
(R1711)
🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.
In `@src/python/T0/RunConfig/Tier0Config.py` around lines 1301 - 1345, The
addReprocessingRequest helper currently accepts invalid requests; add upfront
validation inside the addReprocessingRequest function to enforce the documented
invariants: verify inputDataset is a non-empty string that ends with "/RAW",
ensure globalTag, cmsswVersion, scramArch and scenario are non-empty strings,
confirm procVersion is an integer > 0, validate RecoSplit and Multicore
(options.get('recoSplit') and options.get('multicore')) are positive integers,
and ensure list-valued options like
writeTiers/alcaSkims/physicsSkims/dqmSequences are actual lists (and writeTiers
is not empty); if any check fails, raise a ValueError (or TypeError for bad
types) before appending to config.Global.ReprocessingRequests so only valid
requests are added.
| if requestIndex >= len(config.Global.ReprocessingRequests): | ||
| raise RuntimeError("Reprocessing request index %d out of range" % requestIndex) | ||
| config.Global.ReprocessingRequests[requestIndex]['WriteTiers'] = writeTiers |
There was a problem hiding this comment.
Reject negative request indexes in all reprocessing setters.
Current guards only check >= len(...); negative indexes are valid in Python and will silently update the wrong request.
Suggested fix
- if requestIndex >= len(config.Global.ReprocessingRequests):
+ if not isinstance(requestIndex, int) or requestIndex < 0 or requestIndex >= len(config.Global.ReprocessingRequests):
raise RuntimeError("Reprocessing request index %d out of range" % requestIndex)
@@
- if requestIndex >= len(config.Global.ReprocessingRequests):
+ if not isinstance(requestIndex, int) or requestIndex < 0 or requestIndex >= len(config.Global.ReprocessingRequests):
raise RuntimeError("Reprocessing request index %d out of range" % requestIndex)
@@
- if requestIndex >= len(config.Global.ReprocessingRequests):
+ if not isinstance(requestIndex, int) or requestIndex < 0 or requestIndex >= len(config.Global.ReprocessingRequests):
raise RuntimeError("Reprocessing request index %d out of range" % requestIndex)Also applies to: 1372-1374, 1389-1391
🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.
In `@src/python/T0/RunConfig/Tier0Config.py` around lines 1357 - 1359, The guard
that checks only "if requestIndex >= len(config.Global.ReprocessingRequests)"
allows negative indices, so update all reprocessing setter functions (e.g.,
where you access config.Global.ReprocessingRequests[requestIndex] such as the
WriteTiers setter and the similar blocks at the other noted locations) to also
reject negative values; add a check like "if requestIndex < 0 or requestIndex >=
len(config.Global.ReprocessingRequests): raise RuntimeError(...)" so invalid
negative indexes are refused before assigning to
config.Global.ReprocessingRequests[requestIndex]['WriteTiers'] (and the
analogous keys in the other setters).
| for idx, request in enumerate(reprocessingConfig.Global.ReprocessingRequests): | ||
| requestKey = (request.get('InputDataset', ''), | ||
| request.get('ProcVersion', 0)) | ||
|
|
||
| if requestKey in self.insertedRequests: | ||
| continue |
There was a problem hiding this comment.
Dedup key is too narrow and process-local, causing duplicate/missed inserts.
Using only (InputDataset, ProcVersion) can drop valid distinct requests (different run/lumi/options), and self.insertedRequests resets on restart so the same config can be inserted again into DB.
Suggested direction
- requestKey = (request.get('InputDataset', ''),
- request.get('ProcVersion', 0))
+ requestKey = (
+ request.get('InputDataset', ''),
+ request.get('ProcVersion', 0),
+ request.get('RunId', None),
+ request.get('MinLumi', None),
+ request.get('MaxLumi', None),
+ request.get('GlobalTag', None),
+ request.get('CMSSWVersion', None),
+ request.get('Scenario', None),
+ )Also add a DAO existence check (or DB unique constraint) so dedup survives process restarts.
Also applies to: 193-194
🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.
In `@src/python/T0Component/Reprocessing/ReprocessingPoller.py` around lines 142 -
147, The in-memory dedup key (requestKey) in ReprocessingPoller using only
(InputDataset, ProcVersion) is too narrow and process-local
(self.insertedRequests), causing dropped distinct requests and duplicates after
restarts; update the dedup key construction in the loop that builds requestKey
to include additional distinguishing fields such as RunRange/RunNumber,
LumiRange/Lumi, RequestName and any Options/ExtraArgs present in the request,
and stop relying solely on self.insertedRequests by adding a DAO existence check
(or enforcing/using a DB unique constraint) before inserting so the check
survives process restarts; apply the same change to the other dedup usage around
the second occurrence (the block at the other reported location).
| myThread.transaction.begin() | ||
|
|
||
| if locations['on_tape']: | ||
| # Need staging | ||
| ruleIds = DataStagingAPI.createStagingRules( | ||
| self.rucioClient, | ||
| locations['on_tape'], | ||
| self.processingSiteRSE, | ||
| lifetime = self.stagingLifetime, | ||
| account = self.rucioAccount | ||
| ) | ||
| ruleIdStr = ",".join(ruleIds) | ||
| updateStatusDAO.execute(requestId, "STAGING", | ||
| rucioRuleId = ruleIdStr, | ||
| conn = myThread.transaction.conn, | ||
| transaction = True) | ||
| logging.info("Request %d: staging %d blocks from tape", | ||
| requestId, len(locations['on_tape'])) | ||
| else: | ||
| # All on disk | ||
| updateStatusDAO.execute(requestId, "READY", | ||
| conn = myThread.transaction.conn, | ||
| transaction = True) | ||
| logging.info("Request %d: all data on disk, ready for processing", | ||
| requestId) | ||
|
|
||
| myThread.transaction.commit() | ||
|
|
There was a problem hiding this comment.
Avoid holding a DB transaction open across Rucio staging rule creation.
The transaction starts before createStagingRules(...), an external network call. This can hold DB resources during slow/failed remote operations and increase contention.
Suggested change
- myThread.transaction.begin()
-
if locations['on_tape']:
# Need staging
ruleIds = DataStagingAPI.createStagingRules(
self.rucioClient,
locations['on_tape'],
self.processingSiteRSE,
lifetime = self.stagingLifetime,
account = self.rucioAccount
)
ruleIdStr = ",".join(ruleIds)
+ myThread.transaction.begin()
updateStatusDAO.execute(requestId, "STAGING",
rucioRuleId = ruleIdStr,
conn = myThread.transaction.conn,
transaction = True)
@@
else:
# All on disk
+ myThread.transaction.begin()
updateStatusDAO.execute(requestId, "READY",
conn = myThread.transaction.conn,
transaction = True)📝 Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.
| myThread.transaction.begin() | |
| if locations['on_tape']: | |
| # Need staging | |
| ruleIds = DataStagingAPI.createStagingRules( | |
| self.rucioClient, | |
| locations['on_tape'], | |
| self.processingSiteRSE, | |
| lifetime = self.stagingLifetime, | |
| account = self.rucioAccount | |
| ) | |
| ruleIdStr = ",".join(ruleIds) | |
| updateStatusDAO.execute(requestId, "STAGING", | |
| rucioRuleId = ruleIdStr, | |
| conn = myThread.transaction.conn, | |
| transaction = True) | |
| logging.info("Request %d: staging %d blocks from tape", | |
| requestId, len(locations['on_tape'])) | |
| else: | |
| # All on disk | |
| updateStatusDAO.execute(requestId, "READY", | |
| conn = myThread.transaction.conn, | |
| transaction = True) | |
| logging.info("Request %d: all data on disk, ready for processing", | |
| requestId) | |
| myThread.transaction.commit() | |
| if locations['on_tape']: | |
| # Need staging | |
| ruleIds = DataStagingAPI.createStagingRules( | |
| self.rucioClient, | |
| locations['on_tape'], | |
| self.processingSiteRSE, | |
| lifetime = self.stagingLifetime, | |
| account = self.rucioAccount | |
| ) | |
| ruleIdStr = ",".join(ruleIds) | |
| myThread.transaction.begin() | |
| updateStatusDAO.execute(requestId, "STAGING", | |
| rucioRuleId = ruleIdStr, | |
| conn = myThread.transaction.conn, | |
| transaction = True) | |
| logging.info("Request %d: staging %d blocks from tape", | |
| requestId, len(locations['on_tape'])) | |
| else: | |
| # All on disk | |
| myThread.transaction.begin() | |
| updateStatusDAO.execute(requestId, "READY", | |
| conn = myThread.transaction.conn, | |
| transaction = True) | |
| logging.info("Request %d: all data on disk, ready for processing", | |
| requestId) | |
| myThread.transaction.commit() |
🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.
In `@src/python/T0Component/Reprocessing/ReprocessingPoller.py` around lines 290 -
317, The DB transaction is opened before calling the external
DataStagingAPI.createStagingRules, which can block DB resources; change the flow
so createStagingRules is called entirely outside the DB transaction, then begin
myThread.transaction only when you need to persist the resulting rule
IDs/status. Concretely: call DataStagingAPI.createStagingRules(...) first
(handle exceptions and set an appropriate failure/cleanup path), then call
myThread.transaction.begin(), call updateStatusDAO.execute(requestId, "STAGING",
rucioRuleId=ruleIdStr, ...) or updateStatusDAO.execute(..., "READY", ...) as
needed, and finally myThread.transaction.commit(); keep the logging and ensure
rollback is used on transaction errors. Reference symbols:
myThread.transaction.begin/commit, DataStagingAPI.createStagingRules,
updateStatusDAO.execute, ruleIdStr.
| for workflow in notClosedOut: | ||
| if workflowName in str(workflow): | ||
| workflowDone = False |
There was a problem hiding this comment.
Workflow completion check uses fragile substring matching.
if workflowName in str(workflow) can produce false matches (e.g., shared prefixes) and mark requests DONE incorrectly.
Suggested fix
- for workflow in notClosedOut:
- if workflowName in str(workflow):
+ for workflow in notClosedOut:
+ candidate = workflow.get('name') if isinstance(workflow, dict) else workflow[0]
+ if workflowName == candidate:
workflowDone = False
break🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.
In `@src/python/T0Component/Reprocessing/ReprocessingPoller.py` around lines 581 -
583, The loop over notClosedOut uses a fragile substring check `if workflowName
in str(workflow)` which yields false positives; change it to compare the
workflowName against a concrete workflow identifier field (e.g., use
`workflow.get('name') == workflowName` or `workflow.get('workflowName') ==
workflowName` or compare IDs like `workflow.get('id') == workflowName`) inside
the for-loop that sets workflowDone, so you perform an exact match (or a clearly
delimited match such as equality on a parsed field) instead of converting the
whole workflow object to string; update the check in the block that assigns
workflowDone to use that explicit attribute comparison.
| def test_duplicate_request_skipped(self): | ||
| """Same dataset+procversion should only be inserted once.""" | ||
| mockConfig = MagicMock() | ||
| request = { | ||
| 'InputDataset': '/ZeroBias/Run2024A-v1/RAW', | ||
| 'GlobalTag': '140X_dataRun3_PromptNew_v2', | ||
| 'CMSSWVersion': 'CMSSW_14_0_17', | ||
| 'ScramArch': 'el8_amd64_gcc12', | ||
| 'ProcVersion': 2, | ||
| 'Scenario': 'ppEra_Run3', | ||
| 'WriteTiers': ['AOD', 'MINIAOD', 'DQM'], | ||
| 'AlcaSkims': [], | ||
| 'PhysicsSkims': [], | ||
| 'DqmSequences': [], | ||
| 'NanoFlavour': None, | ||
| 'RecoSplit': 500, | ||
| 'Multicore': 8, | ||
| 'RunId': None, | ||
| 'MinLumi': None, | ||
| 'MaxLumi': None, | ||
| } | ||
| mockConfig.Global.ReprocessingRequests = [request, request] | ||
|
|
||
| mockInsertDAO = MagicMock() | ||
| self.poller.daoFactory = MagicMock(return_value=mockInsertDAO) | ||
|
|
||
| import threading | ||
| mockThread = MagicMock() | ||
| with patch.object(threading, 'currentThread', return_value=mockThread): | ||
| self.poller.insertConfigRequests(mockConfig) | ||
|
|
||
| # Should only be called once despite two identical requests | ||
| self.assertEqual(mockInsertDAO.execute.call_count, 1) | ||
|
|
There was a problem hiding this comment.
🛠️ Refactor suggestion | 🟠 Major | ⚡ Quick win
Add a test for distinct requests sharing dataset+procVersion.
Current duplicate test only covers identical objects. Please add a case where InputDataset/ProcVersion are equal but run/lumi differ, and assert both are inserted.
🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.
In `@test/python/T0_t/Reprocessing_t/ReprocessingPoller_t.py` around lines 121 -
154, Extend test_duplicate_request_skipped in ReprocessingPoller_t to also cover
distinct requests that share InputDataset and ProcVersion but differ in
run/lumi: create two request dicts where 'InputDataset' and 'ProcVersion' are
identical but 'RunId'/'MinLumi'/'MaxLumi' differ, set
mockConfig.Global.ReprocessingRequests to [req1, req2], keep daoFactory
returning mockInsertDAO and call insertConfigRequests(self.poller, mockConfig)
the same way (using patched threading.currentThread), and assert
mockInsertDAO.execute.call_count == 2 to verify both were inserted; keep the
original duplicate-identical check as well.
Summary by CodeRabbit
Release Notes
New Features
Tests