Skip to content

Add protection against failure to reserve jobs#41

Draft
Raveline wants to merge 8 commits intomasterfrom
reenqueue-unparseable-jobs-take-2
Draft

Add protection against failure to reserve jobs#41
Raveline wants to merge 8 commits intomasterfrom
reenqueue-unparseable-jobs-take-2

Conversation

@Raveline
Copy link
Contributor

WIP

@Raveline Raveline force-pushed the reenqueue-unparseable-jobs-take-2 branch 3 times, most recently from fdc55bf to e53381c Compare January 14, 2026 17:09
@Raveline
Copy link
Contributor Author

All right, this version works for a wrong query in the consumers. Here is an example by adding a non existing column:

  "error": "DBException {dbeQueryContext = SQL \"UPDATE chargebee_tasks SET   reserved_by = <16> , attempts = CASE     WHEN finished_at IS NULL THEN attempts + 1     ELSE 1   END WHERE id = ANY( <Array1 []> ) RETURNING id, event, attempts, surprise_surprise\", dbeBackendPid = 500778, dbeError = DetailedQueryError {qeSeverity = \"ERROR\", qeErrorCode = UndefinedColumn, qeMessagePrimary = \"column \\\"surprise_surprise\\\" does not exist\", qeMessageDetail = Nothing, qeMessageHint = Nothing, qeStatementPosition = Just 181, qeInternalPosition = Nothing, qeInternalQuery = Nothing, qeContext = Nothing, qeSourceFile = Just \"parse_relation.c\", qeSourceLine = Just 3651, qeSourceFunction = Just \"errorMissingColumn\"}, dbeCallStack = [(\"runPreparedSQL\",SrcLoc {srcLocPackage = \"consumers-2.3.4.0-83af746950b8f91e9e93ca91ee357d2eb6bad519fa73eb8edc6643cfa528cbf7\", srcLocModule = \"Database.PostgreSQL.Consumers.Components\", srcLocFile = \"src/Database/PostgreSQL/Consumers/Components.hs\", srcLocStartLine = 387, srcLocStartCol = 17, srcLocEndLine = 387, srcLocEndCol = 31})]}",

Though technically, in its current state, it says it will reenqueue tasks even if there are no tasks (and the error will pop every so often, but that's ok: it's pretty bad to have a query that fails even when there are no actual jobs to run), so I suspect we need two different handle.

@Raveline Raveline force-pushed the reenqueue-unparseable-jobs-take-2 branch from 1006cf5 to 0d15769 Compare January 15, 2026 11:28
@Raveline
Copy link
Contributor Author

Latest commit also guarantees it doesn't fail when a single job fails to parse (without blocking the others). However, the update logic doesn't seem to work yet for some reason :(

@Raveline Raveline force-pushed the reenqueue-unparseable-jobs-take-2 branch from 33b0e21 to b47da37 Compare January 23, 2026 11:04
@Raveline
Copy link
Contributor Author

Raveline commented Mar 6, 2026

Build errors are unrelated (but I suspect we're missing some lib install in the CI). This works, and is tested. Now, the main issue is having all these rollbacks / commit in the middle of the consumer loop - it's not great. Another solution would be to have a savepoint, but I think it actually makes sense to rollback in this context. @arybczak any opinion or better solutions ?

Copy link
Collaborator

@arybczak arybczak left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Another solution would be to have a savepoint

Nah, this one is good because it rollbacks only when there's an error, so the common path stays efficient. A savepoint would have to be always created, adding unnecessary overhead for each reservation.

pure (batchSize > 0)

reserveJobs :: Int -> m ([job], Int)
reserveJobs :: Int -> m ([Maybe job], Int)
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This no longer works as advertised, because if it returns ([Nothing], 1), then batch size lies and the when (batchSize > 0) $ do block above is unnecessarily entered.

I think it's best to return m [job], i.e. get rid of maybes before returning from this helper and calculate batchSize using length in loop.

else
handle
( \(SomeException e) -> do
logAttention "Failure to fetch the jobs, will reenqueue for 6 hours later" $ object ["error" .= show e, "job_ids" .= show jobIds]
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why 6 hours? #39 does 1 day.

Also, this bit of the exception handler until pure is the same as below, you can make a helper function out of it.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't remember why I changed it. I've set it back to one day.

Unfortunately, factorizing is hard here:

  • If I want to factorize the rollback, GHC demands that I add MonadDB instances EVERYWHERE;
  • The log messages are different, and use different objects;

So the only part I managed to really unite is the update logic - which is not nothing, since it gives us a single place to define the delay before the job gets retried.

toUpdate = (,Failed . RerunAfter . ihours $ 6) <$> jobIds
rollback
lift $ updateJobs toUpdate
commit
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No need for explicit commit, it's going to be committed right after as we exit runDBT immediately.

, "RETURNING id, " <+> mintercalate ", " ccJobSelectors
]
qr <- queryResult
results <- forM (F.toList qr) $ \(jobIdRow :*: other) ->
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
results <- forM (F.toList qr) $ \(jobIdRow :*: other) ->
results <- forM (F.toList qr) $ \(rawJobId :*: jobSelectors) ->

jobIdRow is a weird name, I'm not sure how it's a row. And other is non-descriptive.

Also, you can use fetchMany for less cruft, explicit queryResult in the original code was used for lazy processing, but we're forgoing it here.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

fetchMany demands again a cascade of constraint, so I opted to keep it as is.

qr <- queryResult
results <- forM (F.toList qr) $ \(jobIdRow :*: other) ->
let jobId = runIdentity jobIdRow
in handle
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'd do

(Just <$> liftBase (evaluate $ ccJobFetcher other)) `catch` \(SomeException e) -> do
   ...

seems more readable to me that way since you immediately see which code is being handled.

I also extracted Just outside the evaluate because the combination of evaluate, $ and $! isn't great for readability imo :P

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Much better, thank you !

]
qr <- queryResult
results <- forM (F.toList qr) $ \(jobIdRow :*: other) ->
let jobId = runIdentity jobIdRow
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If you make a do block above, you won't need in for the let.

:: forall m idx job
. ( MonadBaseControl IO m
, MonadLog m
, MonadCatch m
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Redundant, MonadMask implies it.

@Raveline Raveline force-pushed the reenqueue-unparseable-jobs-take-2 branch from ebfc9da to 77b6504 Compare March 19, 2026 15:04
@Raveline Raveline force-pushed the reenqueue-unparseable-jobs-take-2 branch from 77b6504 to c564f35 Compare March 20, 2026 14:34
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants