Skip to content

Reprocessing#7

Open
Viphava280444 wants to merge 17 commits into
masterfrom
Reprocessing
Open

Reprocessing#7
Viphava280444 wants to merge 17 commits into
masterfrom
Reprocessing

Conversation

@Viphava280444
Copy link
Copy Markdown
Owner

@Viphava280444 Viphava280444 commented Mar 17, 2026

Summary by CodeRabbit

Release Notes

  • New Features

    • Added reprocessing from RAW datasets with configuration management for input datasets, processing versions, and output parameters
    • Introduced data staging API for discovering RAW files, classifying storage locations, and orchestrating tape recalls
    • New reprocessing component for managing request lifecycle from submission through completion tracking
  • Tests

    • Added comprehensive unit tests for data staging and reprocessing workflows

Review Change Stack

@coderabbitai
Copy link
Copy Markdown

coderabbitai Bot commented May 18, 2026

📝 Walkthrough

Walkthrough

This PR implements a complete reprocessing-from-RAW subsystem for Tier-0. It adds configuration helpers and an example file, Oracle schema and DAOs for request tracking, DBS/Rucio data staging functions, a multi-state ReprocessingPoller orchestrator that manages the workflow from submission through workload injection and completion, and a Harness component wrapper. Comprehensive unit tests validate configuration, data staging, and state transitions.

Changes

Reprocessing from RAW feature

Layer / File(s) Summary
Configuration helpers and setup
src/python/T0/RunConfig/Tier0Config.py, etc/ReprocessingConfiguration.py, src/python/T0/Reprocessing/__init__.py
Tier0Config.createTier0Config() initializes Global.ReprocessingRequests. New helpers addReprocessingRequest, setReprocessingOutputTiers, setReprocessingAlcaSkims, and setReprocessingRunFilter build and customize request objects. Example configuration file demonstrates usage.
Oracle schema and data access objects
src/python/T0/WMBS/Oracle/Create.py, src/python/T0/WMBS/Oracle/Reprocessing/*
reprocessing_request table stores request state with identity primary key, dataset/config fields, status, Rucio rule ID, workflow name, and timestamps. Five DAOs (GetReprocessingByStatus, GetReprocessingRequest, ListReprocessingRequests, InsertReprocessingRequest, UpdateReprocessingStatus) provide parameterized queries and mutations.
Data staging API: DBS discovery and Rucio staging
src/python/T0/Reprocessing/DataStagingAPI.py, test/python/T0_t/Reprocessing_t/DataStagingAPI_t.py
discoverRAWFiles queries DBS3 for files (optionally filtered by run/lumi) grouped by block. classifyFileLocations queries Rucio replicas to classify blocks as on_disk, on_tape, or missing. createStagingRules creates asynchronous Rucio replication rules per block. checkStagingStatus aggregates rule states into overall status. deleteStagingRule removes rules with replica purge. Tests cover error cases, classification scenarios, rule parameters, and status polling.
ReprocessingPoller state machine
src/python/T0Component/Reprocessing/ReprocessingPoller.py, test/python/T0_t/Reprocessing_t/ReprocessingPoller_t.py
Multi-state polling orchestrator: inserts config-defined requests, progresses SUBMITTED→VALIDATED (discover RAW files), VALIDATED→READY/STAGING (classify replicas, create Rucio rules), STAGING→STAGED/FAILED (monitor completion), READY/STAGED→INJECTED (build PromptReco workloads and WMBS subscriptions), INJECTED→DONE (poll feeder workflow closure). Tests verify request insertion deduplication, validation of required fields and RAW tier constraint, and Tier0Config helper function behavior.
Harness component lifecycle
src/python/T0Component/Reprocessing/Reprocessing.py, src/python/T0Component/Reprocessing/__init__.py
Reprocessing Harness subclass sets pollTime=1 and registers ReprocessingPoller worker thread during preInitialization, delegating workflow request processing to the poller worker.

Estimated code review effort

🎯 4 (Complex) | ⏱️ ~60 minutes

Poem

🐰 A new subsystem hops into place,
Reprocessing RAW at a steady pace,
DBS and Rucio dance in the light,
State machines whirl from morning to night,
WMBS workflows leap up and take flight! 🚀

🚥 Pre-merge checks | ✅ 2 | ❌ 3

❌ Failed checks (2 warnings, 1 inconclusive)

Check name Status Explanation Resolution
Description check ⚠️ Warning No pull request description was provided; the author did not complete the required 'Replay Request' template with requestor, configuration details, or purpose information. Add a comprehensive pull request description following the 'Replay Request' template, including requestor details, configuration (release, run, GTs), purpose of the change, and any relevant cmsTalk thread links.
Docstring Coverage ⚠️ Warning Docstring coverage is 78.26% which is insufficient. The required threshold is 80.00%. Write docstrings for the functions missing them to satisfy the coverage threshold.
Title check ❓ Inconclusive The title 'Reprocessing' is vague and generic; it does not convey specific meaningful information about what the changeset accomplishes beyond a broad category name. Use a more descriptive title that explains the primary change, such as 'Add reprocessing-from-RAW subsystem with configuration, data staging, and state machine'.
✅ Passed checks (2 passed)
Check name Status Explanation
Linked Issues check ✅ Passed Check skipped because no linked issues were found for this pull request.
Out of Scope Changes check ✅ Passed Check skipped because no linked issues were found for this pull request.

✏️ Tip: You can configure your own custom pre-merge checks in the settings.

✨ Finishing Touches
📝 Generate docstrings
  • Create stacked PR
  • Commit on current branch
🧪 Generate unit tests (beta)
  • Create PR with unit tests
  • Commit unit tests in branch Reprocessing

Comment @coderabbitai help to get the list of available commands and usage tips.

@Viphava280444
Copy link
Copy Markdown
Owner Author

@coderabbitai

@Viphava280444
Copy link
Copy Markdown
Owner Author

@coderabbitai help

@coderabbitai
Copy link
Copy Markdown

coderabbitai Bot commented May 18, 2026

Chat

There are 3 ways to chat with CodeRabbit:

  • Review comments: Directly reply to a review comment made by CodeRabbit. Example:
    • I pushed a fix in commit <commit_id>, please review it.
    • Open a follow-up GitHub issue for this discussion.
  • Files and specific lines of code (under the "Files changed" tab): Tag @coderabbitai in a new review comment at the desired location with your query.
  • PR comments: Tag @coderabbitai in a new PR comment to ask questions about the PR branch. For the best results, please provide a very specific query, as very limited context is provided in this mode. Examples:
    • @coderabbitai gather interesting stats about this repository and render them as a table. Additionally, render a pie chart showing the language distribution in the codebase.
    • @coderabbitai read the files in the src/scheduler package and generate a class diagram using mermaid and a README in the markdown format.

CodeRabbit commands

These commands are invoked using PR/Issue comments.

  • @coderabbitai pause to pause the reviews on a PR.
  • @coderabbitai resume to resume the paused reviews.
  • @coderabbitai review to trigger an incremental review. This is useful when automatic reviews are disabled for the repository.
  • @coderabbitai full review to do a full review from scratch and review all the files again.
  • @coderabbitai rate limit to show your current review rate limit status.
  • @coderabbitai summary to regenerate the summary of the PR.
  • @coderabbitai evaluate custom pre-merge check --instructions <custom-checks-instructions> --name <custom-checks-title> [--mode <error|warning>] to validate the custom pre-merge checks instructions. Defaults to error when --mode is omitted.
  • @coderabbitai ignore pre-merge checks to override pre-merge checks and get an approval on PR.
  • @coderabbitai run pre-merge checks to run pre-merge checks on the pull request.
  • @coderabbitai generate docstrings to generate docstrings for this PR.
  • @coderabbitai generate sequence diagram to generate a sequence diagram of the changes in this PR.
  • @coderabbitai generate unit tests to generate unit tests for this PR.
  • @coderabbitai resolve merge conflict to automatically resolve merge conflicts.
  • @coderabbitai autofix to automatically fix issues identified in unresolved review comments.
  • @coderabbitai resolve to resolve all the CodeRabbit review comments.
  • @coderabbitai configuration to show the current CodeRabbit configuration for the repository.
  • @coderabbitai generate configuration to create a PR that adds the current resolved configuration as .coderabbit.yaml (or show it if already present).
  • @coderabbitai help to get help.

Other keywords and placeholders

  • Add @coderabbitai ignore or @coderabbit ignore anywhere in the PR description to prevent this PR from being reviewed.
  • Add @coderabbitai summary to generate the high-level summary at a specific location in the PR description.
  • Add @coderabbitai anywhere in the PR title to generate the title automatically.

CodeRabbit configuration file (.coderabbit.yaml)

  • You can programmatically configure CodeRabbit by adding a .coderabbit.yaml file to the root of your repository.
  • Please see the configuration documentation for more information.
  • You can also validate your configuration using the online YAML validator.
  • If your editor has YAML language server enabled, you can add the path at the top of this file to enable auto-completion and validation: # yaml-language-server: $schema=https://coderabbit.ai/integrations/schema.v2.json

Status, support, documentation and community

  • Visit our status page to check the current availability of CodeRabbit.
  • Create a ticket on our support page for assistance with any issues or questions.
  • Visit our documentation site for detailed information on how to use CodeRabbit.
  • Join our Discord community to connect with other users and get help from the community.
  • Follow us on X/Twitter for updates and announcements.

@coderabbitai
Copy link
Copy Markdown

coderabbitai Bot commented May 18, 2026

Tip

For best results, initiate chat on the files or code changes.

🧩 Analysis chain

🏁 Script executed:

#!/bin/bash
gh pr view 7 --repo Viphava280444/Tier-0

Length of output: 76


@Viphava280444 It looks like you mentioned me but didn't include a specific request. Here are some things I can help with:

  • Trigger a review — just say @coderabbitai review
  • Generate a summary — say @coderabbitai summary
  • Answer questions about the code or PR
  • Help with code changes or planning

What would you like me to do? 😊

(ᵔᴥᵔ)🐇

@Viphava280444
Copy link
Copy Markdown
Owner Author

@coderabbitai review

@coderabbitai
Copy link
Copy Markdown

coderabbitai Bot commented May 18, 2026

✅ Actions performed

Review triggered.

Note: CodeRabbit is an incremental review system and does not re-review already reviewed commits. This command is applicable only when automatic reviews are paused.

Copy link
Copy Markdown

@coderabbitai coderabbitai Bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 6

🧹 Nitpick comments (2)
src/python/T0/WMBS/Oracle/Create.py (1)

550-551: ⚡ Quick win

Use a composite index to match status polling query order.

Status polling reads WHERE status = ... ORDER BY created_at; indexing (status, created_at) avoids extra sort work as queue size grows.

Suggested fix
-            """CREATE INDEX idx_reprocessing_request_1 ON reprocessing_request (status)"""
+            """CREATE INDEX idx_reprocessing_request_1 ON reprocessing_request (status, created_at)"""
🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.

In `@src/python/T0/WMBS/Oracle/Create.py` around lines 550 - 551, Replace the
single-column index creation assigned to self.indexes[...] (currently creating
idx_reprocessing_request_1 on reprocessing_request (status)) with a composite
index on (status, created_at) to match the polling query order; update the index
name (e.g. idx_reprocessing_request_status_created_at) and the CREATE INDEX
statement to "CREATE INDEX idx_reprocessing_request_status_created_at ON
reprocessing_request (status, created_at)" so the database can avoid extra
sorting when queries use WHERE status = ... ORDER BY created_at.
src/python/T0Component/Reprocessing/Reprocessing.py (1)

30-37: ⚡ Quick win

Replace print diagnostics with logger calls.

print(...) here bypasses component logging/formatting controls; use logging.info/debug instead.

🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.

In `@src/python/T0Component/Reprocessing/Reprocessing.py` around lines 30 - 37,
Replace the two print(...) calls in Reprocessing.__init__ and
Reprocessing.preInitialization with proper logging calls (e.g.,
logging.getLogger(__name__).info(...) or the component instance logger if
available, e.g., self.logger.info(...)) so messages respect configured
logging/formatting; update the message text to include the method name
(Reprocessing.__init__ and Reprocessing.preInitialization) and choose info or
debug level accordingly.
🤖 Prompt for all review comments with AI agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.

Inline comments:
In `@src/python/T0/RunConfig/Tier0Config.py`:
- Around line 1357-1359: The guard that checks only "if requestIndex >=
len(config.Global.ReprocessingRequests)" allows negative indices, so update all
reprocessing setter functions (e.g., where you access
config.Global.ReprocessingRequests[requestIndex] such as the WriteTiers setter
and the similar blocks at the other noted locations) to also reject negative
values; add a check like "if requestIndex < 0 or requestIndex >=
len(config.Global.ReprocessingRequests): raise RuntimeError(...)" so invalid
negative indexes are refused before assigning to
config.Global.ReprocessingRequests[requestIndex]['WriteTiers'] (and the
analogous keys in the other setters).
- Around line 1301-1345: The addReprocessingRequest helper currently accepts
invalid requests; add upfront validation inside the addReprocessingRequest
function to enforce the documented invariants: verify inputDataset is a
non-empty string that ends with "/RAW", ensure globalTag, cmsswVersion,
scramArch and scenario are non-empty strings, confirm procVersion is an integer
> 0, validate RecoSplit and Multicore (options.get('recoSplit') and
options.get('multicore')) are positive integers, and ensure list-valued options
like writeTiers/alcaSkims/physicsSkims/dqmSequences are actual lists (and
writeTiers is not empty); if any check fails, raise a ValueError (or TypeError
for bad types) before appending to config.Global.ReprocessingRequests so only
valid requests are added.

In `@src/python/T0Component/Reprocessing/ReprocessingPoller.py`:
- Around line 142-147: The in-memory dedup key (requestKey) in
ReprocessingPoller using only (InputDataset, ProcVersion) is too narrow and
process-local (self.insertedRequests), causing dropped distinct requests and
duplicates after restarts; update the dedup key construction in the loop that
builds requestKey to include additional distinguishing fields such as
RunRange/RunNumber, LumiRange/Lumi, RequestName and any Options/ExtraArgs
present in the request, and stop relying solely on self.insertedRequests by
adding a DAO existence check (or enforcing/using a DB unique constraint) before
inserting so the check survives process restarts; apply the same change to the
other dedup usage around the second occurrence (the block at the other reported
location).
- Around line 581-583: The loop over notClosedOut uses a fragile substring check
`if workflowName in str(workflow)` which yields false positives; change it to
compare the workflowName against a concrete workflow identifier field (e.g., use
`workflow.get('name') == workflowName` or `workflow.get('workflowName') ==
workflowName` or compare IDs like `workflow.get('id') == workflowName`) inside
the for-loop that sets workflowDone, so you perform an exact match (or a clearly
delimited match such as equality on a parsed field) instead of converting the
whole workflow object to string; update the check in the block that assigns
workflowDone to use that explicit attribute comparison.
- Around line 290-317: The DB transaction is opened before calling the external
DataStagingAPI.createStagingRules, which can block DB resources; change the flow
so createStagingRules is called entirely outside the DB transaction, then begin
myThread.transaction only when you need to persist the resulting rule
IDs/status. Concretely: call DataStagingAPI.createStagingRules(...) first
(handle exceptions and set an appropriate failure/cleanup path), then call
myThread.transaction.begin(), call updateStatusDAO.execute(requestId, "STAGING",
rucioRuleId=ruleIdStr, ...) or updateStatusDAO.execute(..., "READY", ...) as
needed, and finally myThread.transaction.commit(); keep the logging and ensure
rollback is used on transaction errors. Reference symbols:
myThread.transaction.begin/commit, DataStagingAPI.createStagingRules,
updateStatusDAO.execute, ruleIdStr.

In `@test/python/T0_t/Reprocessing_t/ReprocessingPoller_t.py`:
- Around line 121-154: Extend test_duplicate_request_skipped in
ReprocessingPoller_t to also cover distinct requests that share InputDataset and
ProcVersion but differ in run/lumi: create two request dicts where
'InputDataset' and 'ProcVersion' are identical but 'RunId'/'MinLumi'/'MaxLumi'
differ, set mockConfig.Global.ReprocessingRequests to [req1, req2], keep
daoFactory returning mockInsertDAO and call insertConfigRequests(self.poller,
mockConfig) the same way (using patched threading.currentThread), and assert
mockInsertDAO.execute.call_count == 2 to verify both were inserted; keep the
original duplicate-identical check as well.

---

Nitpick comments:
In `@src/python/T0/WMBS/Oracle/Create.py`:
- Around line 550-551: Replace the single-column index creation assigned to
self.indexes[...] (currently creating idx_reprocessing_request_1 on
reprocessing_request (status)) with a composite index on (status, created_at) to
match the polling query order; update the index name (e.g.
idx_reprocessing_request_status_created_at) and the CREATE INDEX statement to
"CREATE INDEX idx_reprocessing_request_status_created_at ON reprocessing_request
(status, created_at)" so the database can avoid extra sorting when queries use
WHERE status = ... ORDER BY created_at.

In `@src/python/T0Component/Reprocessing/Reprocessing.py`:
- Around line 30-37: Replace the two print(...) calls in Reprocessing.__init__
and Reprocessing.preInitialization with proper logging calls (e.g.,
logging.getLogger(__name__).info(...) or the component instance logger if
available, e.g., self.logger.info(...)) so messages respect configured
logging/formatting; update the message text to include the method name
(Reprocessing.__init__ and Reprocessing.preInitialization) and choose info or
debug level accordingly.
🪄 Autofix (Beta)

Fix all unresolved CodeRabbit comments on this PR:

  • Push a commit to this branch (recommended)
  • Create a new PR with the fixes

ℹ️ Review info
⚙️ Run configuration

Configuration used: defaults

Review profile: CHILL

Plan: Pro Plus

Run ID: daf53656-0371-46b5-855e-3c613fa2b114

📥 Commits

Reviewing files that changed from the base of the PR and between 28f8b0b and 4a08717.

📒 Files selected for processing (17)
  • etc/ReprocessingConfiguration.py
  • src/python/T0/Reprocessing/DataStagingAPI.py
  • src/python/T0/Reprocessing/__init__.py
  • src/python/T0/RunConfig/Tier0Config.py
  • src/python/T0/WMBS/Oracle/Create.py
  • src/python/T0/WMBS/Oracle/Reprocessing/GetReprocessingByStatus.py
  • src/python/T0/WMBS/Oracle/Reprocessing/GetReprocessingRequest.py
  • src/python/T0/WMBS/Oracle/Reprocessing/InsertReprocessingRequest.py
  • src/python/T0/WMBS/Oracle/Reprocessing/ListReprocessingRequests.py
  • src/python/T0/WMBS/Oracle/Reprocessing/UpdateReprocessingStatus.py
  • src/python/T0/WMBS/Oracle/Reprocessing/__init__.py
  • src/python/T0Component/Reprocessing/Reprocessing.py
  • src/python/T0Component/Reprocessing/ReprocessingPoller.py
  • src/python/T0Component/Reprocessing/__init__.py
  • test/python/T0_t/Reprocessing_t/DataStagingAPI_t.py
  • test/python/T0_t/Reprocessing_t/ReprocessingPoller_t.py
  • test/python/T0_t/Reprocessing_t/__init__.py

Comment on lines +1301 to +1345
def addReprocessingRequest(config, inputDataset, globalTag, cmsswVersion,
scramArch, procVersion, scenario, **options):
"""
_addReprocessingRequest_

Add a reprocessing-from-RAW request to the configuration.

Required parameters:
inputDataset - DBS dataset path, must end in /RAW
globalTag - conditions GlobalTag
cmsswVersion - CMSSW release to use
scramArch - SCRAM architecture
procVersion - processing version (int)
scenario - CMSSW scenario name

Optional parameters (via **options):
writeTiers - list of output tiers (default: ["AOD", "MINIAOD", "DQM"])
alcaSkims - list of AlcaSkim names (default: [])
physicsSkims - list of physics skim names (default: [])
dqmSequences - list of DQM sequences (default: [])
nanoFlavour - NanoAOD flavour (default: None)
recoSplit - events per job (default: 500)
multicore - number of cores (default: 8)
"""
request = {
'InputDataset': inputDataset,
'GlobalTag': globalTag,
'CMSSWVersion': cmsswVersion,
'ScramArch': scramArch,
'ProcVersion': procVersion,
'Scenario': scenario,
'WriteTiers': options.get('writeTiers', ["AOD", "MINIAOD", "DQM"]),
'AlcaSkims': options.get('alcaSkims', []),
'PhysicsSkims': options.get('physicsSkims', []),
'DqmSequences': options.get('dqmSequences', []),
'NanoFlavour': options.get('nanoFlavour', None),
'RecoSplit': options.get('recoSplit', 500),
'Multicore': options.get('multicore', 8),
'RunId': None,
'MinLumi': None,
'MaxLumi': None,
}

config.Global.ReprocessingRequests.append(request)
return
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟠 Major | ⚡ Quick win

Enforce required request invariants at creation time.

addReprocessingRequest documents strict requirements (e.g., RAW dataset), but currently accepts invalid values and defers failure downstream.

Suggested fix
 def addReprocessingRequest(config, inputDataset, globalTag, cmsswVersion,
                            scramArch, procVersion, scenario, **options):
@@
+    if not isinstance(inputDataset, str) or not inputDataset.endswith("/RAW"):
+        raise RuntimeError("inputDataset must be a DBS dataset path ending in /RAW")
+    if not isinstance(procVersion, int) or procVersion < 1:
+        raise RuntimeError("procVersion must be a positive integer")
+
+    writeTiers = options.get('writeTiers', ["AOD", "MINIAOD", "DQM"])
+    if not isinstance(writeTiers, list) or len(writeTiers) == 0:
+        raise RuntimeError("writeTiers must be a non-empty list")
+
     request = {
@@
-        'WriteTiers': options.get('writeTiers', ["AOD", "MINIAOD", "DQM"]),
+        'WriteTiers': writeTiers,
🧰 Tools
🪛 Pylint (4.0.5)

[refactor] 1301-1301: Too many positional arguments (7/5)

(R0917)


[refactor] 1301-1301: Useless return at end of function or method

(R1711)

🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.

In `@src/python/T0/RunConfig/Tier0Config.py` around lines 1301 - 1345, The
addReprocessingRequest helper currently accepts invalid requests; add upfront
validation inside the addReprocessingRequest function to enforce the documented
invariants: verify inputDataset is a non-empty string that ends with "/RAW",
ensure globalTag, cmsswVersion, scramArch and scenario are non-empty strings,
confirm procVersion is an integer > 0, validate RecoSplit and Multicore
(options.get('recoSplit') and options.get('multicore')) are positive integers,
and ensure list-valued options like
writeTiers/alcaSkims/physicsSkims/dqmSequences are actual lists (and writeTiers
is not empty); if any check fails, raise a ValueError (or TypeError for bad
types) before appending to config.Global.ReprocessingRequests so only valid
requests are added.

Comment on lines +1357 to +1359
if requestIndex >= len(config.Global.ReprocessingRequests):
raise RuntimeError("Reprocessing request index %d out of range" % requestIndex)
config.Global.ReprocessingRequests[requestIndex]['WriteTiers'] = writeTiers
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟠 Major | ⚡ Quick win

Reject negative request indexes in all reprocessing setters.

Current guards only check >= len(...); negative indexes are valid in Python and will silently update the wrong request.

Suggested fix
-    if requestIndex >= len(config.Global.ReprocessingRequests):
+    if not isinstance(requestIndex, int) or requestIndex < 0 or requestIndex >= len(config.Global.ReprocessingRequests):
         raise RuntimeError("Reprocessing request index %d out of range" % requestIndex)
@@
-    if requestIndex >= len(config.Global.ReprocessingRequests):
+    if not isinstance(requestIndex, int) or requestIndex < 0 or requestIndex >= len(config.Global.ReprocessingRequests):
         raise RuntimeError("Reprocessing request index %d out of range" % requestIndex)
@@
-    if requestIndex >= len(config.Global.ReprocessingRequests):
+    if not isinstance(requestIndex, int) or requestIndex < 0 or requestIndex >= len(config.Global.ReprocessingRequests):
         raise RuntimeError("Reprocessing request index %d out of range" % requestIndex)

Also applies to: 1372-1374, 1389-1391

🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.

In `@src/python/T0/RunConfig/Tier0Config.py` around lines 1357 - 1359, The guard
that checks only "if requestIndex >= len(config.Global.ReprocessingRequests)"
allows negative indices, so update all reprocessing setter functions (e.g.,
where you access config.Global.ReprocessingRequests[requestIndex] such as the
WriteTiers setter and the similar blocks at the other noted locations) to also
reject negative values; add a check like "if requestIndex < 0 or requestIndex >=
len(config.Global.ReprocessingRequests): raise RuntimeError(...)" so invalid
negative indexes are refused before assigning to
config.Global.ReprocessingRequests[requestIndex]['WriteTiers'] (and the
analogous keys in the other setters).

Comment on lines +142 to +147
for idx, request in enumerate(reprocessingConfig.Global.ReprocessingRequests):
requestKey = (request.get('InputDataset', ''),
request.get('ProcVersion', 0))

if requestKey in self.insertedRequests:
continue
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟠 Major | 🏗️ Heavy lift

Dedup key is too narrow and process-local, causing duplicate/missed inserts.

Using only (InputDataset, ProcVersion) can drop valid distinct requests (different run/lumi/options), and self.insertedRequests resets on restart so the same config can be inserted again into DB.

Suggested direction
-            requestKey = (request.get('InputDataset', ''),
-                          request.get('ProcVersion', 0))
+            requestKey = (
+                request.get('InputDataset', ''),
+                request.get('ProcVersion', 0),
+                request.get('RunId', None),
+                request.get('MinLumi', None),
+                request.get('MaxLumi', None),
+                request.get('GlobalTag', None),
+                request.get('CMSSWVersion', None),
+                request.get('Scenario', None),
+            )

Also add a DAO existence check (or DB unique constraint) so dedup survives process restarts.

Also applies to: 193-194

🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.

In `@src/python/T0Component/Reprocessing/ReprocessingPoller.py` around lines 142 -
147, The in-memory dedup key (requestKey) in ReprocessingPoller using only
(InputDataset, ProcVersion) is too narrow and process-local
(self.insertedRequests), causing dropped distinct requests and duplicates after
restarts; update the dedup key construction in the loop that builds requestKey
to include additional distinguishing fields such as RunRange/RunNumber,
LumiRange/Lumi, RequestName and any Options/ExtraArgs present in the request,
and stop relying solely on self.insertedRequests by adding a DAO existence check
(or enforcing/using a DB unique constraint) before inserting so the check
survives process restarts; apply the same change to the other dedup usage around
the second occurrence (the block at the other reported location).

Comment on lines +290 to +317
myThread.transaction.begin()

if locations['on_tape']:
# Need staging
ruleIds = DataStagingAPI.createStagingRules(
self.rucioClient,
locations['on_tape'],
self.processingSiteRSE,
lifetime = self.stagingLifetime,
account = self.rucioAccount
)
ruleIdStr = ",".join(ruleIds)
updateStatusDAO.execute(requestId, "STAGING",
rucioRuleId = ruleIdStr,
conn = myThread.transaction.conn,
transaction = True)
logging.info("Request %d: staging %d blocks from tape",
requestId, len(locations['on_tape']))
else:
# All on disk
updateStatusDAO.execute(requestId, "READY",
conn = myThread.transaction.conn,
transaction = True)
logging.info("Request %d: all data on disk, ready for processing",
requestId)

myThread.transaction.commit()

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟠 Major | ⚡ Quick win

Avoid holding a DB transaction open across Rucio staging rule creation.

The transaction starts before createStagingRules(...), an external network call. This can hold DB resources during slow/failed remote operations and increase contention.

Suggested change
-                myThread.transaction.begin()
-
                 if locations['on_tape']:
                     # Need staging
                     ruleIds = DataStagingAPI.createStagingRules(
                         self.rucioClient,
                         locations['on_tape'],
                         self.processingSiteRSE,
                         lifetime = self.stagingLifetime,
                         account = self.rucioAccount
                     )
                     ruleIdStr = ",".join(ruleIds)
+                    myThread.transaction.begin()
                     updateStatusDAO.execute(requestId, "STAGING",
                                             rucioRuleId = ruleIdStr,
                                             conn = myThread.transaction.conn,
                                             transaction = True)
@@
                 else:
                     # All on disk
+                    myThread.transaction.begin()
                     updateStatusDAO.execute(requestId, "READY",
                                             conn = myThread.transaction.conn,
                                             transaction = True)
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
myThread.transaction.begin()
if locations['on_tape']:
# Need staging
ruleIds = DataStagingAPI.createStagingRules(
self.rucioClient,
locations['on_tape'],
self.processingSiteRSE,
lifetime = self.stagingLifetime,
account = self.rucioAccount
)
ruleIdStr = ",".join(ruleIds)
updateStatusDAO.execute(requestId, "STAGING",
rucioRuleId = ruleIdStr,
conn = myThread.transaction.conn,
transaction = True)
logging.info("Request %d: staging %d blocks from tape",
requestId, len(locations['on_tape']))
else:
# All on disk
updateStatusDAO.execute(requestId, "READY",
conn = myThread.transaction.conn,
transaction = True)
logging.info("Request %d: all data on disk, ready for processing",
requestId)
myThread.transaction.commit()
if locations['on_tape']:
# Need staging
ruleIds = DataStagingAPI.createStagingRules(
self.rucioClient,
locations['on_tape'],
self.processingSiteRSE,
lifetime = self.stagingLifetime,
account = self.rucioAccount
)
ruleIdStr = ",".join(ruleIds)
myThread.transaction.begin()
updateStatusDAO.execute(requestId, "STAGING",
rucioRuleId = ruleIdStr,
conn = myThread.transaction.conn,
transaction = True)
logging.info("Request %d: staging %d blocks from tape",
requestId, len(locations['on_tape']))
else:
# All on disk
myThread.transaction.begin()
updateStatusDAO.execute(requestId, "READY",
conn = myThread.transaction.conn,
transaction = True)
logging.info("Request %d: all data on disk, ready for processing",
requestId)
myThread.transaction.commit()
🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.

In `@src/python/T0Component/Reprocessing/ReprocessingPoller.py` around lines 290 -
317, The DB transaction is opened before calling the external
DataStagingAPI.createStagingRules, which can block DB resources; change the flow
so createStagingRules is called entirely outside the DB transaction, then begin
myThread.transaction only when you need to persist the resulting rule
IDs/status. Concretely: call DataStagingAPI.createStagingRules(...) first
(handle exceptions and set an appropriate failure/cleanup path), then call
myThread.transaction.begin(), call updateStatusDAO.execute(requestId, "STAGING",
rucioRuleId=ruleIdStr, ...) or updateStatusDAO.execute(..., "READY", ...) as
needed, and finally myThread.transaction.commit(); keep the logging and ensure
rollback is used on transaction errors. Reference symbols:
myThread.transaction.begin/commit, DataStagingAPI.createStagingRules,
updateStatusDAO.execute, ruleIdStr.

Comment on lines +581 to +583
for workflow in notClosedOut:
if workflowName in str(workflow):
workflowDone = False
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟠 Major | ⚡ Quick win

Workflow completion check uses fragile substring matching.

if workflowName in str(workflow) can produce false matches (e.g., shared prefixes) and mark requests DONE incorrectly.

Suggested fix
-                for workflow in notClosedOut:
-                    if workflowName in str(workflow):
+                for workflow in notClosedOut:
+                    candidate = workflow.get('name') if isinstance(workflow, dict) else workflow[0]
+                    if workflowName == candidate:
                         workflowDone = False
                         break
🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.

In `@src/python/T0Component/Reprocessing/ReprocessingPoller.py` around lines 581 -
583, The loop over notClosedOut uses a fragile substring check `if workflowName
in str(workflow)` which yields false positives; change it to compare the
workflowName against a concrete workflow identifier field (e.g., use
`workflow.get('name') == workflowName` or `workflow.get('workflowName') ==
workflowName` or compare IDs like `workflow.get('id') == workflowName`) inside
the for-loop that sets workflowDone, so you perform an exact match (or a clearly
delimited match such as equality on a parsed field) instead of converting the
whole workflow object to string; update the check in the block that assigns
workflowDone to use that explicit attribute comparison.

Comment on lines +121 to +154
def test_duplicate_request_skipped(self):
"""Same dataset+procversion should only be inserted once."""
mockConfig = MagicMock()
request = {
'InputDataset': '/ZeroBias/Run2024A-v1/RAW',
'GlobalTag': '140X_dataRun3_PromptNew_v2',
'CMSSWVersion': 'CMSSW_14_0_17',
'ScramArch': 'el8_amd64_gcc12',
'ProcVersion': 2,
'Scenario': 'ppEra_Run3',
'WriteTiers': ['AOD', 'MINIAOD', 'DQM'],
'AlcaSkims': [],
'PhysicsSkims': [],
'DqmSequences': [],
'NanoFlavour': None,
'RecoSplit': 500,
'Multicore': 8,
'RunId': None,
'MinLumi': None,
'MaxLumi': None,
}
mockConfig.Global.ReprocessingRequests = [request, request]

mockInsertDAO = MagicMock()
self.poller.daoFactory = MagicMock(return_value=mockInsertDAO)

import threading
mockThread = MagicMock()
with patch.object(threading, 'currentThread', return_value=mockThread):
self.poller.insertConfigRequests(mockConfig)

# Should only be called once despite two identical requests
self.assertEqual(mockInsertDAO.execute.call_count, 1)

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🛠️ Refactor suggestion | 🟠 Major | ⚡ Quick win

Add a test for distinct requests sharing dataset+procVersion.

Current duplicate test only covers identical objects. Please add a case where InputDataset/ProcVersion are equal but run/lumi differ, and assert both are inserted.

🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.

In `@test/python/T0_t/Reprocessing_t/ReprocessingPoller_t.py` around lines 121 -
154, Extend test_duplicate_request_skipped in ReprocessingPoller_t to also cover
distinct requests that share InputDataset and ProcVersion but differ in
run/lumi: create two request dicts where 'InputDataset' and 'ProcVersion' are
identical but 'RunId'/'MinLumi'/'MaxLumi' differ, set
mockConfig.Global.ReprocessingRequests to [req1, req2], keep daoFactory
returning mockInsertDAO and call insertConfigRequests(self.poller, mockConfig)
the same way (using patched threading.currentThread), and assert
mockInsertDAO.execute.call_count == 2 to verify both were inserted; keep the
original duplicate-identical check as well.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

1 participant