From f2a7ce30c4647cb7c375b8a18e1c283fa92c8da7 Mon Sep 17 00:00:00 2001 From: Raveline Date: Tue, 13 Jan 2026 18:06:33 +0100 Subject: [PATCH 1/8] Add protection against failure to reserve jobs --- .../PostgreSQL/Consumers/Components.hs | 45 ++++++++++++------- 1 file changed, 28 insertions(+), 17 deletions(-) diff --git a/consumers/src/Database/PostgreSQL/Consumers/Components.hs b/consumers/src/Database/PostgreSQL/Consumers/Components.hs index 127e30a..48f9138 100644 --- a/consumers/src/Database/PostgreSQL/Consumers/Components.hs +++ b/consumers/src/Database/PostgreSQL/Consumers/Components.hs @@ -297,10 +297,12 @@ spawnDispatcher :: forall m idx job . ( MonadBaseControl IO m , MonadLog m + , MonadCatch m , MonadMask m , MonadTime m , Show idx , ToSQL idx + , FromSQL idx ) => ConsumerConfig m idx job -> ConnectionSourceM m @@ -360,23 +362,7 @@ spawnDispatcher ConsumerConfig {..} cs cid semaphore runningJobsInfo runningJobs reserveJobs :: Int -> m ([job], Int) reserveJobs limit = runDBT cs ts $ do now <- currentTime - n <- - runPreparedSQL (preparedSqlName "setReservation" ccJobsTable) $ - smconcat - [ "UPDATE" <+> raw ccJobsTable <+> "SET" - , " reserved_by =" cid - , ", attempts = CASE" - , " WHEN finished_at IS NULL THEN attempts + 1" - , " ELSE 1" - , " END" - , "WHERE id IN (" <> reservedJobs now <> ")" - , "RETURNING" <+> mintercalate ", " ccJobSelectors - ] - -- Decode lazily as we want the transaction to be as short as possible. - (,n) . F.toList . fmap ccJobFetcher <$> queryResult - where - reservedJobs :: UTCTime -> SQL - reservedJobs now = + runPreparedSQL_ (preparedSqlName "getReservedIds" ccJobsTable) $ smconcat [ "SELECT id FROM" <+> raw ccJobsTable , "WHERE" @@ -387,6 +373,31 @@ spawnDispatcher ConsumerConfig {..} cs cid semaphore runningJobsInfo runningJobs , "LIMIT" limit , "FOR UPDATE SKIP LOCKED" ] + jobIds :: [idx] <- fetchMany runIdentity + let onFailure (SomeException e) = do + logAttention "Failure to fetch the jobs, will reenqueue for 6 hours later" $ object [ "error" .= show e, "job_ids" .= show jobIds ] + let toUpdate :: [(idx, Result)] + toUpdate = (, Failed . RerunAfter . ihours $ 6) <$> jobIds + lift $ updateJobs toUpdate + pure ([], 0) + handle + onFailure + (do + logInfo "Reserving jobs" $ object ["job_ids" .= show jobIds] + n <- runPreparedSQL (preparedSqlName "setReservation" ccJobsTable) $ + smconcat + [ "UPDATE" <+> raw ccJobsTable <+> "SET" + , " reserved_by =" cid + , ", attempts = CASE" + , " WHEN finished_at IS NULL THEN attempts + 1" + , " ELSE 1" + , " END" + , "WHERE id = ANY(" Array1 jobIds <+> ")" + , "RETURNING" <+> mintercalate ", " ccJobSelectors + ] + -- Decode lazily as we want the transaction to be as short as possible. + (,n) . F.toList . fmap ccJobFetcher <$> queryResult + ) -- Spawn each job in a separate thread. startJob :: job -> m (job, m (T.Result Result)) From 666d3e71f584b6dd3d1e0eede3e4e1a1e33777cc Mon Sep 17 00:00:00 2001 From: Raveline Date: Thu, 15 Jan 2026 12:23:22 +0100 Subject: [PATCH 2/8] POC for single-row failure reenqueue --- .../PostgreSQL/Consumers/Components.hs | 86 +++++++++++-------- 1 file changed, 50 insertions(+), 36 deletions(-) diff --git a/consumers/src/Database/PostgreSQL/Consumers/Components.hs b/consumers/src/Database/PostgreSQL/Consumers/Components.hs index 48f9138..0ae2184 100644 --- a/consumers/src/Database/PostgreSQL/Consumers/Components.hs +++ b/consumers/src/Database/PostgreSQL/Consumers/Components.hs @@ -22,6 +22,7 @@ import Data.Foldable qualified as F import Data.Function import Data.Int import Data.Map.Strict qualified as M +import Data.Maybe (catMaybes) import Data.Monoid.Utils import Database.PostgreSQL.Consumers.Config import Database.PostgreSQL.Consumers.Consumer @@ -348,7 +349,7 @@ spawnDispatcher ConsumerConfig {..} cs cid semaphore runningJobsInfo runningJobs . (`finally` subtractJobs) . restore $ do - mapM startJob batch >>= mapM joinJob >>= updateJobs + mapM startJob (catMaybes batch) >>= mapM joinJob >>= updateJobs when (batchSize == limit) $ do maxBatchSize <- atomically $ do @@ -359,45 +360,58 @@ spawnDispatcher ConsumerConfig {..} cs cid semaphore runningJobsInfo runningJobs pure (batchSize > 0) - reserveJobs :: Int -> m ([job], Int) + reserveJobs :: Int -> m ([Maybe job], Int) reserveJobs limit = runDBT cs ts $ do now <- currentTime runPreparedSQL_ (preparedSqlName "getReservedIds" ccJobsTable) $ - smconcat - [ "SELECT id FROM" <+> raw ccJobsTable - , "WHERE" - , " reserved_by IS NULL" - , " AND run_at IS NOT NULL" - , " AND run_at <= " now - , " ORDER BY run_at" - , "LIMIT" limit - , "FOR UPDATE SKIP LOCKED" - ] + smconcat + [ "SELECT id FROM" <+> raw ccJobsTable + , "WHERE" + , " reserved_by IS NULL" + , " AND run_at IS NOT NULL" + , " AND run_at <= " now + , " ORDER BY run_at" + , "LIMIT" limit + , "FOR UPDATE SKIP LOCKED" + ] jobIds :: [idx] <- fetchMany runIdentity - let onFailure (SomeException e) = do - logAttention "Failure to fetch the jobs, will reenqueue for 6 hours later" $ object [ "error" .= show e, "job_ids" .= show jobIds ] - let toUpdate :: [(idx, Result)] - toUpdate = (, Failed . RerunAfter . ihours $ 6) <$> jobIds - lift $ updateJobs toUpdate - pure ([], 0) - handle - onFailure - (do - logInfo "Reserving jobs" $ object ["job_ids" .= show jobIds] - n <- runPreparedSQL (preparedSqlName "setReservation" ccJobsTable) $ - smconcat - [ "UPDATE" <+> raw ccJobsTable <+> "SET" - , " reserved_by =" cid - , ", attempts = CASE" - , " WHEN finished_at IS NULL THEN attempts + 1" - , " ELSE 1" - , " END" - , "WHERE id = ANY(" Array1 jobIds <+> ")" - , "RETURNING" <+> mintercalate ", " ccJobSelectors - ] - -- Decode lazily as we want the transaction to be as short as possible. - (,n) . F.toList . fmap ccJobFetcher <$> queryResult - ) + if null jobIds + then pure ([], 0) + else do + let onFailure (SomeException e) = do + logAttention "Failure to fetch the jobs, will reenqueue for 6 hours later" $ object ["error" .= show e, "job_ids" .= show jobIds] + let toUpdate :: [(idx, Result)] + toUpdate = (,Failed . RerunAfter . ihours $ 6) <$> jobIds + lift $ updateJobs toUpdate + pure ([], 0) + handle + onFailure + ( do + logInfo "Reserving jobs" $ object ["job_ids" .= show jobIds] + n <- + runPreparedSQL (preparedSqlName "setReservation" ccJobsTable) $ + smconcat + [ "UPDATE" <+> raw ccJobsTable <+> "SET" + , " reserved_by =" cid + , ", attempts = CASE" + , " WHEN finished_at IS NULL THEN attempts + 1" + , " ELSE 1" + , " END" + , "WHERE id = ANY(" Array1 jobIds <+> ")" + , "RETURNING id, " <+> mintercalate ", " ccJobSelectors + ] + qr <- queryResult + results <- forM (F.toList qr) $ \(jobIdRow :*: other) -> + let jobId = runIdentity jobIdRow + in handle + ( \(SomeException e) -> do + logAttention "Failure to fetch job, will reenqueue for 6 hours later" $ object ["error" .= show e, "job_id" .= show jobId] + lift $ updateJobs [(jobId, Failed . RerunAfter . ihours $ 6)] + pure Nothing + ) + (pure . Just $ ccJobFetcher other) + pure (results, n) + ) -- Spawn each job in a separate thread. startJob :: job -> m (job, m (T.Result Result)) From e51b93dd2be5110079577e0ec0a76f9acab915e9 Mon Sep 17 00:00:00 2001 From: Raveline Date: Thu, 15 Jan 2026 15:57:57 +0100 Subject: [PATCH 3/8] Separate handle to ensure the proper handle catches exceptions --- .../PostgreSQL/Consumers/Components.hs | 18 ++++++------------ 1 file changed, 6 insertions(+), 12 deletions(-) diff --git a/consumers/src/Database/PostgreSQL/Consumers/Components.hs b/consumers/src/Database/PostgreSQL/Consumers/Components.hs index 0ae2184..79baea3 100644 --- a/consumers/src/Database/PostgreSQL/Consumers/Components.hs +++ b/consumers/src/Database/PostgreSQL/Consumers/Components.hs @@ -383,14 +383,9 @@ spawnDispatcher ConsumerConfig {..} cs cid semaphore runningJobsInfo runningJobs let toUpdate :: [(idx, Result)] toUpdate = (,Failed . RerunAfter . ihours $ 6) <$> jobIds lift $ updateJobs toUpdate - pure ([], 0) - handle - onFailure - ( do - logInfo "Reserving jobs" $ object ["job_ids" .= show jobIds] - n <- - runPreparedSQL (preparedSqlName "setReservation" ccJobsTable) $ - smconcat + pure 0 + n <- handle + onFailure . runPreparedSQL (preparedSqlName "setReservation" ccJobsTable) $ smconcat [ "UPDATE" <+> raw ccJobsTable <+> "SET" , " reserved_by =" cid , ", attempts = CASE" @@ -400,8 +395,8 @@ spawnDispatcher ConsumerConfig {..} cs cid semaphore runningJobsInfo runningJobs , "WHERE id = ANY(" Array1 jobIds <+> ")" , "RETURNING id, " <+> mintercalate ", " ccJobSelectors ] - qr <- queryResult - results <- forM (F.toList qr) $ \(jobIdRow :*: other) -> + qr <- queryResult + results <- forM (F.toList qr) $ \(jobIdRow :*: other) -> let jobId = runIdentity jobIdRow in handle ( \(SomeException e) -> do @@ -410,8 +405,7 @@ spawnDispatcher ConsumerConfig {..} cs cid semaphore runningJobsInfo runningJobs pure Nothing ) (pure . Just $ ccJobFetcher other) - pure (results, n) - ) + pure (results, n) -- Spawn each job in a separate thread. startJob :: job -> m (job, m (T.Result Result)) From 20e72ca3fd3d04f846adf5bb161701c5ad715bb2 Mon Sep 17 00:00:00 2001 From: Raveline Date: Mon, 19 Jan 2026 07:22:23 +0100 Subject: [PATCH 4/8] Keep the queryResult call inside a handle block --- .../PostgreSQL/Consumers/Components.hs | 21 ++++++++++++------- 1 file changed, 13 insertions(+), 8 deletions(-) diff --git a/consumers/src/Database/PostgreSQL/Consumers/Components.hs b/consumers/src/Database/PostgreSQL/Consumers/Components.hs index 79baea3..21de8ba 100644 --- a/consumers/src/Database/PostgreSQL/Consumers/Components.hs +++ b/consumers/src/Database/PostgreSQL/Consumers/Components.hs @@ -377,15 +377,19 @@ spawnDispatcher ConsumerConfig {..} cs cid semaphore runningJobsInfo runningJobs jobIds :: [idx] <- fetchMany runIdentity if null jobIds then pure ([], 0) - else do - let onFailure (SomeException e) = do + else + handle + ( \(SomeException e) -> do logAttention "Failure to fetch the jobs, will reenqueue for 6 hours later" $ object ["error" .= show e, "job_ids" .= show jobIds] let toUpdate :: [(idx, Result)] toUpdate = (,Failed . RerunAfter . ihours $ 6) <$> jobIds lift $ updateJobs toUpdate - pure 0 - n <- handle - onFailure . runPreparedSQL (preparedSqlName "setReservation" ccJobsTable) $ smconcat + pure ([], 0) + ) + ( do + n <- + runPreparedSQL (preparedSqlName "setReservation" ccJobsTable) $ + smconcat [ "UPDATE" <+> raw ccJobsTable <+> "SET" , " reserved_by =" cid , ", attempts = CASE" @@ -395,8 +399,8 @@ spawnDispatcher ConsumerConfig {..} cs cid semaphore runningJobsInfo runningJobs , "WHERE id = ANY(" Array1 jobIds <+> ")" , "RETURNING id, " <+> mintercalate ", " ccJobSelectors ] - qr <- queryResult - results <- forM (F.toList qr) $ \(jobIdRow :*: other) -> + qr <- queryResult + results <- forM (F.toList qr) $ \(jobIdRow :*: other) -> let jobId = runIdentity jobIdRow in handle ( \(SomeException e) -> do @@ -405,7 +409,8 @@ spawnDispatcher ConsumerConfig {..} cs cid semaphore runningJobsInfo runningJobs pure Nothing ) (pure . Just $ ccJobFetcher other) - pure (results, n) + pure (results, n) + ) -- Spawn each job in a separate thread. startJob :: job -> m (job, m (T.Result Result)) From 2d2b3a9c4c544f511b25ae9d2eb2e5973ab080a9 Mon Sep 17 00:00:00 2001 From: Raveline Date: Mon, 19 Jan 2026 10:08:56 +0100 Subject: [PATCH 5/8] Completely drop lazy evaluation when fetching jobs --- consumers/src/Database/PostgreSQL/Consumers/Components.hs | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/consumers/src/Database/PostgreSQL/Consumers/Components.hs b/consumers/src/Database/PostgreSQL/Consumers/Components.hs index 21de8ba..36887f1 100644 --- a/consumers/src/Database/PostgreSQL/Consumers/Components.hs +++ b/consumers/src/Database/PostgreSQL/Consumers/Components.hs @@ -10,7 +10,7 @@ import Control.Concurrent.Lifted import Control.Concurrent.STM hiding (atomically) import Control.Concurrent.STM qualified as STM import Control.Concurrent.Thread.Lifted qualified as T -import Control.Exception (AsyncException (ThreadKilled)) +import Control.Exception (AsyncException (ThreadKilled), evaluate) import Control.Exception.Safe qualified as ES import Control.Monad import Control.Monad.Base @@ -408,7 +408,7 @@ spawnDispatcher ConsumerConfig {..} cs cid semaphore runningJobsInfo runningJobs lift $ updateJobs [(jobId, Failed . RerunAfter . ihours $ 6)] pure Nothing ) - (pure . Just $ ccJobFetcher other) + (liftBase . evaluate $ Just $! ccJobFetcher other) pure (results, n) ) From f5fad4ffaff24c585b15baaa47cac87e8fe175d1 Mon Sep 17 00:00:00 2001 From: Raveline Date: Thu, 5 Mar 2026 16:37:18 +0100 Subject: [PATCH 6/8] Add tests to check retry mechanism on fetch failures --- .../PostgreSQL/Consumers/Components.hs | 4 ++ consumers/test/Test.hs | 65 +++++++++++++++++++ 2 files changed, 69 insertions(+) diff --git a/consumers/src/Database/PostgreSQL/Consumers/Components.hs b/consumers/src/Database/PostgreSQL/Consumers/Components.hs index 36887f1..5d3aeb9 100644 --- a/consumers/src/Database/PostgreSQL/Consumers/Components.hs +++ b/consumers/src/Database/PostgreSQL/Consumers/Components.hs @@ -383,7 +383,9 @@ spawnDispatcher ConsumerConfig {..} cs cid semaphore runningJobsInfo runningJobs logAttention "Failure to fetch the jobs, will reenqueue for 6 hours later" $ object ["error" .= show e, "job_ids" .= show jobIds] let toUpdate :: [(idx, Result)] toUpdate = (,Failed . RerunAfter . ihours $ 6) <$> jobIds + rollback lift $ updateJobs toUpdate + commit pure ([], 0) ) ( do @@ -405,7 +407,9 @@ spawnDispatcher ConsumerConfig {..} cs cid semaphore runningJobsInfo runningJobs in handle ( \(SomeException e) -> do logAttention "Failure to fetch job, will reenqueue for 6 hours later" $ object ["error" .= show e, "job_id" .= show jobId] + rollback lift $ updateJobs [(jobId, Failed . RerunAfter . ihours $ 6)] + commit pure Nothing ) (liftBase . evaluate $ Just $! ccJobFetcher other) diff --git a/consumers/test/Test.hs b/consumers/test/Test.hs index fd2b61d..c557d8c 100644 --- a/consumers/test/Test.hs +++ b/consumers/test/Test.hs @@ -103,6 +103,55 @@ test = do rowcount1 :: Int64 <- fetchOne runIdentity liftIO $ T.assertEqual "Number of jobs in table after 10 steps is 1024" 1024 rowcount0 liftIO $ T.assertEqual "Number of jobs in table after 11 steps is 0" 0 rowcount1 + + -- Checking the failing mechanism for a single job. + putJob 1 >> commit + do + -- Move time forward again, since we enqueued new jobs. + modifyTestTime $ addUTCTime (2 * 60 * 60) + finalize + ( localDomain "process" $ + runConsumerWithIdleSignal consumerFailingSingleJobConfig connSource idleSignal + ) + $ do + waitUntilTrue idleSignal + currentTime >>= (logInfo_ . T.pack . ("current time: " ++) . show) + + runSQL_ "SELECT COUNT(*) from consumers_test_jobs" + rowcount2 :: Int64 <- fetchOne runIdentity + runSQL_ "SELECT run_at from consumers_test_jobs" + newRunTime :: UTCTime <- fetchOne runIdentity + liftIO $ do + T.assertEqual "The failed job should still be in the table" 1 rowcount2 + areInSixHours [newRunTime] + -- Clean up. + runSQL_ "DELETE FROM consumers_test_jobs;" + + -- Checking the failing mechanism for multiple jobs + let nbOfJobsToFail = 5 + replicateM_ nbOfJobsToFail (putJob 1) >> commit + do + -- Move time forward again, since we enqueued new jobs. + modifyTestTime $ addUTCTime (2 * 60 * 60) + -- Jobs get processed one at a time, so we need to run this as many times as there are jobs + replicateM_ nbOfJobsToFail $ do + finalize + ( localDomain "process" $ + runConsumerWithIdleSignal consumerFailingAllJobsConfig connSource idleSignal + ) + $ do + waitUntilTrue idleSignal + currentTime >>= (logInfo_ . T.pack . ("current time: " ++) . show) + + runSQL_ "SELECT run_at from consumers_test_jobs" + newRunTimes :: [UTCTime] <- fetchMany runIdentity + liftIO $ do + T.assertEqual "All jobs should fail if the query to fetch them is wrong" 5 (length newRunTimes) + areInSixHours newRunTimes + -- Clean up. + runSQL_ "DELETE FROM consumers_test_jobs" + + -- All done. dropTables where waitUntilTrue tmvar = liftIO . atomically $ do @@ -113,6 +162,13 @@ test = do printUsage = do prog <- getProgName putStrLn $ "Usage: " <> prog <> " " + + areInSixHours toCheck = do + let inXHours n = addUTCTime (60 * 60 * n) <$> currentTime + in5Hours <- inXHours 5 + in7Hours <- inXHours 7 + liftIO $ T.assertBool "The failed jobs should be planned to run more than 5 hours later" $ all (in5Hours <) toCheck + liftIO $ T.assertBool "The failed jobs should be planned to run less than 7 hours later" $ all (in7Hours >) toCheck definitions = emptyDbDefinitions {dbTables = [consumersTable, jobsTable]} -- NB: order of migrations is important. @@ -156,6 +212,15 @@ test = do , ccJobLogData = \(i, _) -> ["job_id" .= i] } + simulatingFailure :: (Int64, Int32) -> (Int64, Int32) + simulatingFailure _ = error "Simulating row fetch error" + + consumerFailingSingleJobConfig = + consumerConfig { ccJobFetcher = simulatingFailure } + + consumerFailingAllJobsConfig = + consumerConfig { ccJobSelectors = ["id", "countdown::bigint"] } + putJob :: Int32 -> TestEnv () putJob countdown = localDomain "put" $ do now <- currentTime From f305eca7532fc428583259b739fbc525d903a498 Mon Sep 17 00:00:00 2001 From: Raveline Date: Fri, 6 Mar 2026 15:47:58 +0100 Subject: [PATCH 7/8] Apply formatting --- .../Database/PostgreSQL/Consumers/Components.hs | 2 +- consumers/test/Test.hs | 14 +++++++------- 2 files changed, 8 insertions(+), 8 deletions(-) diff --git a/consumers/src/Database/PostgreSQL/Consumers/Components.hs b/consumers/src/Database/PostgreSQL/Consumers/Components.hs index 5d3aeb9..9fef56f 100644 --- a/consumers/src/Database/PostgreSQL/Consumers/Components.hs +++ b/consumers/src/Database/PostgreSQL/Consumers/Components.hs @@ -412,7 +412,7 @@ spawnDispatcher ConsumerConfig {..} cs cid semaphore runningJobsInfo runningJobs commit pure Nothing ) - (liftBase . evaluate $ Just $! ccJobFetcher other) + (liftBase . evaluate $ Just $! ccJobFetcher other) pure (results, n) ) diff --git a/consumers/test/Test.hs b/consumers/test/Test.hs index c557d8c..471f863 100644 --- a/consumers/test/Test.hs +++ b/consumers/test/Test.hs @@ -146,7 +146,7 @@ test = do runSQL_ "SELECT run_at from consumers_test_jobs" newRunTimes :: [UTCTime] <- fetchMany runIdentity liftIO $ do - T.assertEqual "All jobs should fail if the query to fetch them is wrong" 5 (length newRunTimes) + T.assertEqual "All jobs should fail if the query to fetch them is wrong" 5 (length newRunTimes) areInSixHours newRunTimes -- Clean up. runSQL_ "DELETE FROM consumers_test_jobs" @@ -162,13 +162,13 @@ test = do printUsage = do prog <- getProgName putStrLn $ "Usage: " <> prog <> " " - + areInSixHours toCheck = do let inXHours n = addUTCTime (60 * 60 * n) <$> currentTime in5Hours <- inXHours 5 in7Hours <- inXHours 7 - liftIO $ T.assertBool "The failed jobs should be planned to run more than 5 hours later" $ all (in5Hours <) toCheck - liftIO $ T.assertBool "The failed jobs should be planned to run less than 7 hours later" $ all (in7Hours >) toCheck + liftIO . T.assertBool "The failed jobs should be planned to run more than 5 hours later" $ all (in5Hours <) toCheck + liftIO . T.assertBool "The failed jobs should be planned to run less than 7 hours later" $ all (in7Hours >) toCheck definitions = emptyDbDefinitions {dbTables = [consumersTable, jobsTable]} -- NB: order of migrations is important. @@ -216,10 +216,10 @@ test = do simulatingFailure _ = error "Simulating row fetch error" consumerFailingSingleJobConfig = - consumerConfig { ccJobFetcher = simulatingFailure } - + consumerConfig {ccJobFetcher = simulatingFailure} + consumerFailingAllJobsConfig = - consumerConfig { ccJobSelectors = ["id", "countdown::bigint"] } + consumerConfig {ccJobSelectors = ["id", "countdown::bigint"]} putJob :: Int32 -> TestEnv () putJob countdown = localDomain "put" $ do From c564f35726d4ddb40b5559e14a6dde6ebd7cfee8 Mon Sep 17 00:00:00 2001 From: Raveline Date: Thu, 19 Mar 2026 14:32:29 +0100 Subject: [PATCH 8/8] Apply PR requests --- .../PostgreSQL/Consumers/Components.hs | 68 +++++++++---------- 1 file changed, 33 insertions(+), 35 deletions(-) diff --git a/consumers/src/Database/PostgreSQL/Consumers/Components.hs b/consumers/src/Database/PostgreSQL/Consumers/Components.hs index 9fef56f..d8b72c8 100644 --- a/consumers/src/Database/PostgreSQL/Consumers/Components.hs +++ b/consumers/src/Database/PostgreSQL/Consumers/Components.hs @@ -298,7 +298,6 @@ spawnDispatcher :: forall m idx job . ( MonadBaseControl IO m , MonadLog m - , MonadCatch m , MonadMask m , MonadTime m , Show idx @@ -330,7 +329,8 @@ spawnDispatcher ConsumerConfig {..} cs cid semaphore runningJobsInfo runningJobs loop :: Int -> m Bool loop limit = do - (batch, batchSize) <- reserveJobs limit + batch <- reserveJobs limit + let batchSize = length batch when (batchSize > 0) $ do logInfo "Processing batch" $ object @@ -349,7 +349,7 @@ spawnDispatcher ConsumerConfig {..} cs cid semaphore runningJobsInfo runningJobs . (`finally` subtractJobs) . restore $ do - mapM startJob (catMaybes batch) >>= mapM joinJob >>= updateJobs + mapM startJob batch >>= mapM joinJob >>= updateJobs when (batchSize == limit) $ do maxBatchSize <- atomically $ do @@ -360,7 +360,7 @@ spawnDispatcher ConsumerConfig {..} cs cid semaphore runningJobsInfo runningJobs pure (batchSize > 0) - reserveJobs :: Int -> m ([Maybe job], Int) + reserveJobs :: Int -> m [job] reserveJobs limit = runDBT cs ts $ do now <- currentTime runPreparedSQL_ (preparedSqlName "getReservedIds" ccJobsTable) $ @@ -376,46 +376,44 @@ spawnDispatcher ConsumerConfig {..} cs cid semaphore runningJobsInfo runningJobs ] jobIds :: [idx] <- fetchMany runIdentity if null jobIds - then pure ([], 0) + then pure [] else handle ( \(SomeException e) -> do - logAttention "Failure to fetch the jobs, will reenqueue for 6 hours later" $ object ["error" .= show e, "job_ids" .= show jobIds] - let toUpdate :: [(idx, Result)] - toUpdate = (,Failed . RerunAfter . ihours $ 6) <$> jobIds + logAttention "Failure to fetch the jobs, will reenqueue for the next day" $ object ["error" .= show e, "job_ids" .= show jobIds] rollback - lift $ updateJobs toUpdate - commit - pure ([], 0) + lift $ fetchFailureHandler jobIds + pure [] ) ( do - n <- - runPreparedSQL (preparedSqlName "setReservation" ccJobsTable) $ - smconcat - [ "UPDATE" <+> raw ccJobsTable <+> "SET" - , " reserved_by =" cid - , ", attempts = CASE" - , " WHEN finished_at IS NULL THEN attempts + 1" - , " ELSE 1" - , " END" - , "WHERE id = ANY(" Array1 jobIds <+> ")" - , "RETURNING id, " <+> mintercalate ", " ccJobSelectors - ] + runPreparedSQL_ (preparedSqlName "setReservation" ccJobsTable) $ + smconcat + [ "UPDATE" <+> raw ccJobsTable <+> "SET" + , " reserved_by =" cid + , ", attempts = CASE" + , " WHEN finished_at IS NULL THEN attempts + 1" + , " ELSE 1" + , " END" + , "WHERE id = ANY(" Array1 jobIds <+> ")" + , "RETURNING id, " <+> mintercalate ", " ccJobSelectors + ] qr <- queryResult - results <- forM (F.toList qr) $ \(jobIdRow :*: other) -> - let jobId = runIdentity jobIdRow - in handle - ( \(SomeException e) -> do - logAttention "Failure to fetch job, will reenqueue for 6 hours later" $ object ["error" .= show e, "job_id" .= show jobId] - rollback - lift $ updateJobs [(jobId, Failed . RerunAfter . ihours $ 6)] - commit - pure Nothing - ) - (liftBase . evaluate $ Just $! ccJobFetcher other) - pure (results, n) + results <- forM (F.toList qr) $ \(rawJobId :*: other) -> do + let jobId = runIdentity rawJobId + (Just <$> liftBase (evaluate $ ccJobFetcher other)) `catch` \(SomeException e) -> do + logAttention "Failure to fetch job, will reenqueue for the next day" $ object ["error" .= show e, "job_id" .= show jobId] + rollback + lift $ fetchFailureHandler [jobId] + pure Nothing + pure $ catMaybes results ) + fetchFailureHandler :: [idx] -> m () + fetchFailureHandler jobIds = do + let toUpdate :: [(idx, Result)] + toUpdate = (,Failed . RerunAfter . idays $ 1) <$> jobIds + updateJobs toUpdate + -- Spawn each job in a separate thread. startJob :: job -> m (job, m (T.Result Result)) startJob job = do